In this tutorial we will be focusing On MongoDB Change Streams.
Change streams basically provides you a hook to data base, so
that you can get notified when there is any modification in data base ( any CRUD
operations ). It acts as a trigger and it can be very handy if you don’t want
to use any message queues.
There are already some excellent tutorials available around
this topic, but intention of this tutorial is to provide some practical code
example in python ( which I had hard time finding one ), that will help you
getting started fast.
I would recommend you go to through these references first:
https://developer.mongodb.com/quickstart/python-change-streams
https://severalnines.com/database-blog/real-time-data-streaming-mongodb-change-streams
https://api.mongodb.com/python/current/api/pymongo/change_stream.html
So, what
are the prerequisites to start using Change stream:
1)
You must have your MongoDB instance running in
background. During windows installation MongoDB is added as a startup task. So,
it will start when you start your machine. In Linux also you can add MongoDB on
Startup. You can verify this by going to task manager in windows.
2) Then you need to create a replica set. Note that
change streams work on replica set.
Replica set is exact copy of your database
running on another remote machine or your own local machine. We use replica set
to improve performance of our overall system.
You can read more about replica set on official
mongodb website. Refer Appendix section in this post to create and initiate a
local replica set:
https://developer.mongodb.com/quickstart/python-change-streams
Here, we will create a local replica set so
do not worry about remote machine and all.
3)
To avoid initiating replica set every time we
start our data base, let’s edit data base configuration file :
Data base config file is present at: < C:\Program
Files\MongoDB\Server\4.2\bin\mongod.cfg >
Open this file edit following field and
save:
#replication:
replication:
replSetName:
"rs"
4)
Now you are all set, you can restart the machine,
and do not need to worry about any database configurations.
Change
Stream example code:
We are going to use “pymongo” module for this , so if
you have not installed then
>> pip install pymongo
1) Now first we need a database and collection to
get started , remember in mongoDB, database is not created until you put some
collection and fields inside that. Create a file named “load_demo_database.py” and
copy following code and run this file :
>>> python load_demo_database.py
import pymongo
myclient =
pymongo.MongoClient("mongodb://localhost:27017/")
mydb = myclient["test_database"]
mycol = mydb["test_collection"]
mydict = { "first_name": "Parth",
"last_name": "Pandya" , "present_days" : 0 }
x = mycol.insert_one(mydict)
print("create database successful")
2)
We have created our database; Now let’s start a
change stream.
Create a file named “change_stream_monitor_thread.py”
copy following code , and run this file :
>>> python change_stream_monitor_thread.py
import pymongo
import threading
def scan_user_db_changes( change_stream ):
global mycol
print(str(change_stream))
for change
in change_stream:
if
change["operationType"] == "update":
updatedFields = change["updateDescription"]['updatedFields']
for
field in updatedFields:
if field == "present_days":
print("updated value of present days: " +
str(updatedFields["present_days"]) + "\n")
myclient =
pymongo.MongoClient("mongodb://localhost:27017/")
mydb = myclient["test_database"]
mycol = mydb["test_collection"]
user_change_stream = mycol.watch()
threading.Thread( target = scan_user_db_changes, \
args = ( user_change_stream, ) ,\
daemon = True ).start()
print(“waiting for updates … “)
while True:
pass
here, you can see first we are creating a “daemon” thread. A
“daemon” thread ends when all other threads end, so we cannot allow our main thread
to end that’s why we have kept a “while True” loop at the end.
So, this thread will be running. Keep this console window
open.
3) Now we need to update some value in data base,
so that we can get notifications. Let us create one more file and name it “database_updater.py”
and run this file:
>>> python database_updater.py
This file will update “count” values every 5 seconds:
import pymongo
import time as sleep
myclient =
pymongo.MongoClient("mongodb://localhost:27017/")
mydb = myclient["test_database"]
mycol =
mydb["test_collection"]
for count in range(1,5):
print("updating present days " +
str(count) )
mycol.update_one( {"first_name" :
"Parth" }, \
{ "$set" : {
"present_days" : count } } )
sleep.sleep(5)
Keep both windows open in parallel, you will be able to see
update notification in “change stream” window.
Hope you liked this tutorial, do let me know if you have any
feedback.
No comments:
Post a Comment