GCP Cloud PubSub to MongoDB on Docker with Python
Introduction
Google Cloud Platform offers Cloud PubSub as a publish-subscribe system capable of decoupling the publisher and subscriber services. I have decided to create a simple tutorial that will describe how it is possible to decouple the publisher (that will publish random messages) and the subscriber (that will read the messages and store them inside MongoDB).
Here is the architectural diagram of the solution.
I will cover the creation of this data pipeline solution in the following steps:
- #1: Google Cloud Preparation
- #2: MongoDB up and running
- #3: Streaming Publisher
- #4: PubSub Subscriber
- #5: Write to MongoDB
- #6: Running the solution
This tutorial can be done for free, creating a new Cloud PubSub topic, since it is possible to use it with the credit GCP gives you at the start, but also because if you only test this service you are not going to pay anything because I will move only a small amount of data.
Before starting, the full Python repository of this tutorial is available here.
Step 1 — Google Cloud Preparation
First, I have to populate the Cloud PubSub queue.
You should have an existing project on the Google Cloud Platform and a topic created on GCP. My project ID is called streaming-ingestion-XXXXXX
and the topic name is sample-topic
.
By default, when you create a topic, a basic PULL subscription will be created as well. In this tutorial, I am going to use it, but if you wish to create a different one, you can (as long as you are using the PULL delivery type).
At this point, the last thing that you will need to do on the Google Cloud Platform is to create two different service accounts:
- One for publishing messages into a Cloud PubSub topic: roles/pubsub.publisher
- Another one for subscribing to a Cloud PubSub topic: roles/pubsub.publisher
After having created the two service accounts, you should download the Service Accounts JSON key.
Step 2: MongoDB up and running
To prepare the MongoDB database I decided to go for a simple Docker image that I can add at the end to our docker-compose.
Note: You can decide to use whatever MongoDB community image you wish. Since I am running this on an M2 Apple processor, I opted for the ‘arm64v8/mongo:4.0’ image with the specification of the ‘linux’arm64/v8' platform.
Step 3 — Streaming Publisher
It is now time to jump into the code.
In this section, I cover the microservice that is publishing random messages on Cloud PubSub which I have called streaming-publisher
.
The only Python library that you will need to install for this microservice is google-cloud-pubsub
, which I put on my requirements.txt
file.
This script runs a simple while True loop in its main that, at every iteration, publishes a new message in the queue and sleeps for 1 second.
For the publishing of messages (look at publish_message(message)
) I am doing the following actions:
- getting the client instance for Cloud PubSub: here I will use the publish service account JSON key.
- topic_name: which is the path of the Cloud PubSub topic.
- publish the message using the
publish()
method of the client instance of Cloud PubSub.
Note that strings have to be encoded to bytestrings in order to be published. In my example, I encoded it as the Google Cloud Development documentation using the ‘string’.encode(‘utf-8’)
.
At this point, with the script (app.py), the service account (streaming-publisher-service-account.json), and the requirements file (requirements.txt), I created a Dockerfile to build a Docker image.
Step 4: PubSub subscriber
Now, I focus on retrieving the messages that are on the sample-topic
topic in Cloud PubSub.
As for the previous step, the only library that I will need to install is google-cloud-pubsub
.
In this script what I have done is:
- Defining all the GCP configurations (project_id, topic_id, subscription_id, and subscription_name).
- Created a
callback()
method, that will be triggered for each new message (simple print of the message content for now and acknowledgement of the message). - To receive messages, I have subscribed to the topic using the ‘SubscriberClient’ with the correct service account JSON key (in this case I will use the subscriber service account).
- In the end, I added a try/except block to maintain the Python execution to wait for callbacks.
If you run this code now, it will connect to Cloud PubSub and print and acknowledge all the messages that are on the queue.
Pull vs Push subscriber
In this tutorial, I have used a PULL subscriber, which consists in the client application that asks Cloud PubSub to return new messages when they arrive. But this is not the same as a real “streaming” ingestion.
True streaming will see the Cloud PubSub PUSH-ing new messages to your service. To do so, you will need a dedicated push subscription together with a service that will stay up and running listening for invocations by Cloud PubSub (this can be done through an HTTP endpoint).
Step 5: Write to MongoDB
Now, the last code that I am missing is the insertion of messages into MongoDB.
I will need to add a new requirement for this, which is pymongo
.
To do that, I can modify the previous script to add the MongoDB connection and the writing instructions.
As you can see, the creation of a MongoDB client only requires the connection string with the username and password (right now using default ones). Then I specify the database that I want to work with (sample-database
).
To insert a new item, I get the collection in which I want to insert the message (called messages
), and I use the insert_one()
method to complete the insertion.
At this point, as in step 3, I collect the script (app.py), the service account (batch-subscriber-service-account.json), and the requirements file (requirements.txt), and I create the Dockerfile to build a Docker image.
Step 6: Running the solution
The last step is to merge all the components together.
To do so, I created a docker-compose.yml.
From the code above you can see:
streaming-publisher
: this is the component created during Step 3. On the yml file, I specify only the base folder where the docker build will be executed.batch-subscriber
: as for the streaming-publisher. Here, I also specify the ‘depends_on’ list, which indicates that this service should be started only when themongodb
service is running.mongodb
: specification of the MongoDB database that I will run. Here you can see how I specify the basic username, password (inside the ‘environment’ section), and the ‘volumes’ definition.
Note: on step 5 I can point directly to MongoDB with the URL mongodb
because Docker will redirect those requests to the service called mongodb
.
If you have any tips, comments or questions, do not hesitate to ask me in the comments below.