Saturday, 19 September 2020

MongoDB change stream tutorial in Python

 

In this tutorial we will be focusing On MongoDB Change Streams.

Change streams basically provides you a hook to data base, so that you can get notified when there is any modification in data base ( any CRUD operations ). It acts as a trigger and it can be very handy if you don’t want to use any message queues.

 

There are already some excellent tutorials available around this topic, but intention of this tutorial is to provide some practical code example in python ( which I had hard time finding one ), that will help you getting started fast.

I would recommend you go to through these references first:

https://developer.mongodb.com/quickstart/python-change-streams

https://severalnines.com/database-blog/real-time-data-streaming-mongodb-change-streams

https://api.mongodb.com/python/current/api/pymongo/change_stream.html

 

So, what are the prerequisites to start using Change stream:

1)      You must have your MongoDB instance running in background. During windows installation MongoDB is added as a startup task. So, it will start when you start your machine. In Linux also you can add MongoDB on Startup. You can verify this by going to task manager in windows.



2)     Then you need to create a replica set. Note that change streams work on replica set.

 

Replica set is exact copy of your database running on another remote machine or your own local machine. We use replica set to improve performance of our overall system.

 

You can read more about replica set on official mongodb website. Refer Appendix section in this post to create and initiate a local replica set:

https://developer.mongodb.com/quickstart/python-change-streams

Here, we will create a local replica set so do not worry about remote machine and all.

 

3)      To avoid initiating replica set every time we start our data base, let’s edit data base configuration file :

Data base config file is present at: < C:\Program Files\MongoDB\Server\4.2\bin\mongod.cfg >

Open this file edit following field and save:

 

#replication:

replication:

  replSetName: "rs"

 

4)      Now you are all set, you can restart the machine, and do not need to worry about any database configurations.

 

 

Change Stream example code:

We are going to use “pymongo” module for this , so if you have not installed then

 >> pip install pymongo

1)      Now first we need a database and collection to get started , remember in mongoDB, database is not created until you put some collection and fields inside that. Create a file named “load_demo_database.py” and copy following code and run this file :

 

>>> python load_demo_database.py

 

import pymongo

 

myclient = pymongo.MongoClient("mongodb://localhost:27017/")

mydb = myclient["test_database"]

mycol = mydb["test_collection"]

 

mydict = { "first_name": "Parth", "last_name": "Pandya" , "present_days" : 0 }

x = mycol.insert_one(mydict)

 

print("create database successful")

 

2)      We have created our database; Now let’s start a change stream.

Create a file named “change_stream_monitor_thread.py” copy following code , and run this file :

>>> python change_stream_monitor_thread.py

 

import pymongo

import threading

 

def scan_user_db_changes( change_stream ):

   

    global mycol

   

    print(str(change_stream))

   

    for change in change_stream:

        if change["operationType"] == "update":

           updatedFields = change["updateDescription"]['updatedFields']

          

           for field in updatedFields:           

               if field == "present_days":

                   print("updated value of present days: " + str(updatedFields["present_days"]) + "\n")

                      

                      

                      

myclient = pymongo.MongoClient("mongodb://localhost:27017/")

mydb = myclient["test_database"]

mycol = mydb["test_collection"]                      

 

user_change_stream = mycol.watch()

threading.Thread( target = scan_user_db_changes, \

                  args = ( user_change_stream, ) ,\

                  daemon = True ).start()

 

print(“waiting for updates … “)

while True:

    pass

 

here, you can see first we are creating a “daemon” thread. A “daemon” thread ends when all other threads end, so we cannot allow our main thread to end that’s why we have kept a “while True” loop at the end.

So, this thread will be running. Keep this console window open.

3)     Now we need to update some value in data base, so that we can get notifications. Let us create one more file and name it “database_updater.py” and run this file:

>>> python database_updater.py

This file will update “count” values every 5 seconds:

import pymongo

import time as sleep

myclient = pymongo.MongoClient("mongodb://localhost:27017/")

mydb = myclient["test_database"]

mycol = mydb["test_collection"]

 

for count in range(1,5):

    print("updating present days " + str(count) )

    mycol.update_one( {"first_name" : "Parth" }, \

                     { "$set" : { "present_days" : count } } )

    sleep.sleep(5)

 

Keep both windows open in parallel, you will be able to see update notification in “change stream” window.



Hope you liked this tutorial, do let me know if you have any feedback.

 

No comments:

Post a Comment