Publish content of Cloud Storage files to Pub/Sub in GCP
Introduction
The aim of this tutorial is to define an always-running Agent that is able to read all the files written inside GCP Cloud Storage, parse them to a JSON format, and finally publish them as messages in Cloud Pub/Sub.
We decided to define our agent as a Docker image since this Agent must be able to be installed on the cloud but also locally.
This agent has been defined since we have many files uploaded as a nightly batch on Cloud Storage and we are trying simulating a streaming environment in which files arrive one at a time and can be uploaded directly on a queue.
We will not go through the format of the file, the preprocessing that we are doing, and the basics of Docker, Cloud Storage, and PubSub. For any more details about that please feel free to contact me directly, I’ll help as best as possible.
The steps we are going to make for completing this project are:
- Uploading files on GCP Cloud Storage
- Creating the Cloud Pub/Sub resources
- Agent part 1: Read from GCP Cloud Storage (Agent.py, LocalStorageHelper.py, StorageHelper.py)
- Agent part 2: Publish message to Pub/Sub (PubSubHelper.py)
- Conclusions
The prerequisite for the tutorial is that you have access to a GCP project.
Step 1 — Uploading files on GCP Cloud Storage
Google’s Cloud Storage is a managed service that allows for object storage, in which you can store any amount of data, and retrieve it as often as you’d like.
For this step, you must already have a Cloud Storage bucket. If you do not have one available you can follow this guide: GCP Documentation
After having selected a bucket, you should upload a chunk of files, possibly with the same format, so you can do a single processing operation before publishing them to Cloud Pub/Sub.
To do so, you should enter into the link as defined by:
https://console.cloud.google.com/storage/browser/<bucket_name>
From this, you will be able to upload files directly by drag-and-drop or with the “Upload File” button.
When you finished uploading the files into Cloud Storage we can proceed with the creation of the Cloud Pub/Sub topic.
Step 2 — Creating Pub/Sub topic
Cloud Pub/Sub is a messaging service managed by Google that offers it as part of its Google Cloud Platform suite. Pub/Sub allows services to communicate asynchronously, creating a messaging queue where a service can publish messages, and someone else can retrieve them whenever they want.
Some definitions of the things that we will use can be found at GCP Docs. By the way, here is a brief explanation extracted from the documentation:
- Topic: it is a named resource to which messages are sent by publishers.
- Subscription: it is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application.
- Message: It is the element exchanged through the topic.
- Publisher: An application that creates and sends messages to a topic(s).
- Subscriber: An application with a subscription to a topic(s) to receive messages from it.
- Acknowledgement (or “ack”): A signal sent by a subscriber to Pub/Sub after it has received a message successfully. Acked messages are removed from the subscription’s message queue.
- Push and pull: The two message delivery methods. A subscriber receives messages either by Pub/Sub pushing them to the subscriber’s chosen endpoint or by the subscriber pulling them from the service.
For this tutorial, we will need to configure only a single thing on the Pub/Sub dashboard on Google Cloud Platform console: the topic. To do so, you will simply need to go to your GCP console and open the PubSub dashboard. From here, simply by clicking on top on “CREATE TOPIC” a simple form will appear in which you will have to specify the topic name as below.
Note that the “Add a default subscription” option is enabled by default, but for the sake of this tutorial you will not need it, so I unchecked it.
Step 3 — Agent part 1: Read from GCP Cloud Storage
We will now focus on the part of our agent that will have the duty of reading the various files and their content from GCP Cloud Storage.
First, you will need to download the keys of a Service Account in GCP that have “Storage object viewer”. If you need any help on that you should follow the documentation (here is the link).
It is now time to start with the coding part!
At first here you will find the Agent code that will do as follows:
- Reads from a file in local storage which is the name of the last file downloaded and parsed from Google Cloud Storage
- Get the filename of the next file in Google Cloud Storage that should be downloaded and parsed
- Download the file and parse it
- Publish its content on Pub/Sub
- Save the filename of the file just parsed in the local file.
For now, we will focus on steps 1–3 and 5, so ignore line 51 for now.
The steps that our Agent class will go through on the run() method are commented enough to make it easily understandable.
About the helpers that we have developed, they are stored in a folder called simply “Helper”.
The LocalStorageHelper.py here defined has two simple methods:
- get_last_file(): it returns the content of the local file created. If there is nothing inside it will create the file and return None.
- set_last_file(last_file): it will write on the local file the content of the last_file parameter.
Note that we decided to go on with a local file since this is a simple example of what you can do, but in reality, there are many drawbacks to doing so.
StorageHelper.py is our abstraction layer for GCP Cloud Storage and it provides us the following methods:
- On the constructor, the GCP client is retrieved using the Service Account JSON file.
- get_file_from_gcs(filename): it simply downloads the file from Cloud Storage and returns its content as a bytes string.
- get_list_of_files-from_gcs(offset_name): this is the most interesting use of the list_blobs method of GoogleCloudStorage python library. After you define your own offset (it is custom to the file you are considering) you can use it to let GCP sort all the files inside the Cloud Storage bucket and it will return the names only of the ones that are lexicographically equal or higher than the offset. We are also specifying the max_result parameter that will limit the number of results that the method will return.
Step 4 — Agent part 2: Publish to Pub/Sub
The last part that we are missing is the PubSubHelper.py that is line 51 of Agent.py. In this, we want to provide a simple interface for our agent to publish messages on a Pub/Sub topic. To do so we developed:
- A simple constructor that retrieves the Pub/Sub client with the storage account JSON file (you must add “PubSub publisher” grants to it to make the tutorial work correctly), and it gets the topic path starting from the client and the topic name (the one you have defined before on step 2).
- publish_data_message(message): in this method, we are abstracting the more “low-level method” enforcing the topic_path we want.
- publish_message(message, topic): here we are doing the hard work of publishing a message. Also, backfaults are defined with some prints inside the callback method.
Conclusions
What we have seen here is a simple tutorial that lets you download the content of files from Google Cloud Storage, once at a time, maintaining the last one processed, and publish the content of the files as messages into PubSub.
Starting from this, we have continued by:
- Creating a main.py that loops continuously on the agent run() method
- “Dockerized” our agent
- Build it through Cloud Build
- Store it on Container Registry
- Create a Compute Engine VM that executes our agent and publish continuously messages.
If you are interested in the steps that I just cited, please let me know and I will create an article ad-hoc.
If you have any tips, comments or questions, do not hesitate to ask me in the comments below.