How to Load Data from MongoDB into Amazon Redshift
MongoDB continues to be a major player in the unstructured data-space, and thus data engineers often confront the challenge of helping organizations make sense of the data collected there. While it’s possible to query MongoDB directly for analysis, it’s not ideal for a number of reasons. A major one is the fact that MongoDB does not support SQL, which most analysts know well. It’s also likely the data in your MongoDB instance alone isn’t nearly as valuable as it is alongside data from other systems across your organization.
In Comes Redshift
Though this post isn’t meant to convince you that Redshift is the right solution for a data warehouse, it’s worth noting that by loading data from your MongoDB instance into Redshift you have the opportunity to address both short comings noted above. Redshift allows you to query that data using SQL and supports the high volume of data that is often stored in a MongoDB collection. It’s also a great home for a general data warehouse, allowing you to join up your MongoDB data with data from other sources that you’re sending into Redshift.
A Plan for Getting MongoDB Data to Redshift
Though there are a number of ways to get data from a MongoDB instance into Redshift, I prefer to take a 2-step approach.
- Extract specific fields from your MongoDB documents and store in a flat file (CSV is great) which can be uploaded to an Amazon S3 bucket.
- Use the the Redshift COPY command to load the data into a Redshift table.
We can consider the scope of this work to be the Extract and Load steps of an ELT (Extract-Load-Transform) process.
Extracting Data from MongoDB
Note: You can see the sample code for this post in the Data Liftoff GitHub repo.
Though there are various ETL/ELT tools for extract data from MongoDB and other sources, I prefer to use Python scripts to pull data out of MongoDB, write it to a CSV and upload it to an S3 bucket.
The first step is to install the PyMongo package. PyMongo makes it easy to connect to, and query your MongoDB database from Python.
pip install pymongo
Next, connect to your MongoDB database and point to the collection you want to extract documents from. NOTE: It’s best not to hard-code your username, password and authSource, but for simplicity I’ve used hard-coded strings as placeholders below.
# create a MongoClient instance and connect to your MongoDB host
mongo_client = MongoClient("my_host",
username='my_username',
password='my_password',
authSource='my_auth_db')
# now connect to the database where your collection resides
mongo_db = mongo_client['my_database']
# finally, choose the collection we want to query documents from
mongo_collection = mongo_db['my_collection']
Now it’s time to query the documents we want to move into Redshift. We do this by using .find()
on our mongo_collection
to query the documents we’re looking for. In the example below, we’ll grab all documents with a “date” field value between two dates we’ve defined in Python. This is a common use case, as we often want to grab all documents in a collection within a date range.
import datetime
from datetime import timedelta
start_date = datetime.date.today() + timedelta(days = -1)
end_date = start_date + timedelta(days =1 )
mongo_query = { "$and":[{"date" : { "$gte": start_date }}, {"date" : { "$lt": end_date }}] }
result_docs = mongo_collection.find(mongo_query, batch_size=3000)
The result of the code above is a Cursor named result_docs
which will allow us to iterate through the resulting documents in just a moment. Note however, the batch_size
parameter which I set to 3000. You can read more about batch sizes in MongoDB cursors, but all you need to know for now is that PyMongo makes a round-trip to the MongoDB host for each batch. For example, if the result_docs
Cursor has 6000 results it will take two trips to the MongoDB host to pull all the documents down to the machine where your Python script is running. You should weigh the tradeoffs of storing more documents in local memory vs. making lots of round trips. It all depends on where your resource constraints and bottlenecks lie.
Now that you have your Cursor, it’s time to iterate through and grab the fields you care about from each document. In this simplified example, imagine that each document represents an event that occurred on your website. Perhaps it was a user logging in, viewing a page, or submitting a feedback form. Though the documents might have dozens of fields, we’ll just grab a few that we wish to load into a Redshift table later on.
# create a blank list to store our results
all_events = []
# iterate through the cursor
for doc in event_docs:
# Include default values
event_id = str(doc.get("event_id", -1))
event_timestamp = doc.get("date", None)
event_name = doc.get("event_name", None)
# add all the event properties into a list
current_event = []
current_event.append(event_id)
current_event.append(event_timestamp)
current_event.append(event_name)
# add the event to our final list of events
all_events.append(current_event)
One thing to note, is the fact that I’m including a default value in the doc.get()
function call. Why? The nature of unstructured document data means that it’s possible for fields to not be included in a document, rather than be included with a NULL/None value. In other words, we can’t assume that each of the documents we’re iterating through has a “event_name” field. In those cases we just tell doc.get()
to return a None value instead of throwing an error.
After we iterate through all of our documents, we have just the fields we care about in the all_events
Python list. Next, we’ll turn that list into a CSV file locally. To do so, we’ll make use of the csv module which is included in the standard Python distribution.
import csv
with open('export_file.csv', 'w') as fp:
csvw = csv.writer(fp, delimiter='|')
csvw.writerows(all_events)
fp.close()
Now it’s time to upload our CSV file to an S3 bucket. We’ll use the Boto3 library, which is the AWS SDK for Python. You can use pip or another package manager to get it installed, but I recommend following the Quickstart Guide to get up and running with Boto3. You’ll also need to set up an S3 bucket and credentials (I suggest an IAM user) with access to write to the bucket. Below I use placeholder variables for the keys and bucket name. Once again, I suggest not hard-coding those but rather loading them in from a local config file or other secure location.
import boto3
access_key = "my_access_key"
secret_key = "my_secret_key"
export_file = "export_file.csv"
s3_bucket_name = "s3_bucket_name"
s3_file = "event_export.csv"
s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
s3.upload_file(export_file, s3_bucket_name, s3_file)
Do note that while I always recommend proper exception handing, the last few code block deserve particular attention to such handing before you put this into production. A lot can go wrong when connecting to external systems, parsing unstructured data and moving large files around. Don’t assume that just because it works one time, it will work the next!
Loading into Redshift
Finally, it’s time to load the data we extracted into our Redshift cluster. For this, I’ll use the Redshift COPY command to load the contents of the CSV file into a table. I wrote a post dedicated to this topic which is well worth a read, especially as it pertains to setting up proper security and credentials.
Assuming you’ve read the other post, there are still two things we need to do here. First, we need to create a table in Redshift to store the incoming data. For our simple example, this will do.
CREATE TABLE raw_schema.event_data (
event_id int,
event_timestamp timestamp,
event_name varchar(255) encode ZSTD
)
DISTSTYLE KEY
DISTKEY (event_id)
SORTKEY (event_timestamp);
Next, we’ll run the COPY command to grab the file we just uploaded to S3 and load it into our table. Again, more details on how this works in my other post.
COPY raw_schema.event_data
from 's3://bucket-name/event_export.csv’
iam_role ‘<my-arn>’;
Advanced Considerations
I hope that this post is enough to get you started, but when it comes to loading a high volume of document data from MongoDB into Redshift, things can get complicated quickly. For example, at a high enough volume you’ll want to consider writing (and loading) multiple CSV files in parallel. You’ll also need to manage changes to the fields that are added and removed in the documents as developers make changes in production. That said, it’s best to start simple and build up your data engineering pipelines to handle more advanced cases over time.
Don’t forget to sign up for the Data Liftoff mailing list to get more content and to stay up to date on the latest in data science and data engineering.