Being informed and relevant aren’t enough to catch a moving target that’s disappearing every 24 hours.
If you’ve worked on any cloud-to-cloud migration project that includes the migration of streaming data, you know exactly what I mean when I say being informed and relevant aren’t enough.
You might also know that Amazon Kinesis Data Streams has a free retention period of only 24 hours, beyond which the cost factor kicks in.
And you most certainly know how it feels to play catch-up all the time if cost is a primary constraint in your project.
In his article “3 Risk-Mitigation Lessons That We Learned The Hard Way This Year” our colleague Shashank, set five objectives to mitigate our overall project risks.
Please Note: This article was written by Roshan Patil, Rishi Singh Hora and Krishnakumar Konar.
We took the following table from Shashank’s article, which we highly recommend you to read to understand the business context of the technical challenges we are talking about in this article.
Objective #2 is to decouple the data streaming services, which is critical because we want to guarantee the sequence of the streaming data.
Why does the sequence of data matter?
In her article “How To Reduce Risks And Prepare For The Unknown” our colleague, Sweeti, shared our five-step journey of moving our customer’s infrastructure and data from AWS to GCP.
She dived into the technical challenges that we faced and explained the many choices that we made along the way to pull off this herculean effort.
One of those challenges is to move all the streaming data from Amazon Kinesis Data Streams to Google Cloud Pub/Sub. Both the services are poles apart, but we won’t cover them here because Google has already taken a stab at it.
To better understand the need to maintain the sequence of data, imagine the following scenario.
You and your colleague use a Chat service to collaborate on the latest product updates that need to go in the release notes. Kinesis Data Streams stores all of your chat history in shards with sequence IDs – it’s how Kinesis Data Streams maintains the sequence of the data records.
In case of a failure, it’s easy to restore your chat history (conversation) based on the sequence IDs so you have the context of the entire conversation in its correct order.
And that’s why data sequencing matters. Moving back to our project now.
We needed to move the streaming data to Pub/Sub because some Data Science jobs needed topic updates. But what does this really mean?
Pub/Sub is a messaging service with a publish-subscribe model. What this means is that Pub/Sub publishes data to the relevant topics as soon as it receives the data, and it uses the publishTime attribute to send the data forward.
The subscribers (applications) then consume that data immediately for their specific purpose.
The challenge with Pub/Sub is that it does not guarantee the order in which it receives the data, and it requires the subscribers (applications) to account for scenarios when the data is out of sequence.
Now that you have the context of the problem, let’s dive into the core of this article.
How to grapple with the Kinesis Data Streams challenge?
While the infra team is building the CI/CD pipeline to deploy over 75 services and 200 jobs across 15 GKE clusters, the data team is trying to replicate and sync over 220 tables from DynamoDB to Spanner and Bigtable.
To know why we chose Spanner (SQL database) and Bigtable (NoSQL database) for data migration, read Sweeti’s Hacker Noon article.
At first glance, it seems straightforward to send all the Kinesis streams from AWS to Pub/Sub in GCP. However, we have to move over 70 TB of historical data.
And three tables have over 1 TB of data, which would take nearly three days per table for complete batch migration. However, Kinesis Data Streams has a free retention period of only 24 hours.
Which means that we will lose some portion of the streaming data if we wait for three days to complete the batch migration for even one of those three tables.
Time and money were a luxury that we couldn’t afford, so we had to figure out a way to keep moving forward without waiting for the batch migration to complete or having our customer pay Amazon to increase the data retention period.
How to prevent streaming data loss during historical data transfer?
We discussed multiple options, starting with the seemingly straightforward AWS Lambda-to-Pub/Sub solution. However, as you already know, we cannot guarantee the sequence of the streaming data.
After some trials and discussions, we decided to employ a two-step approach to data transfer. We decided to first move all the historical data from DynamoDB to Google Cloud Storage (GCS).
From there, we decided to move that data to either Spanner or Bigtable based on whether the table has a secondary index (move data to Spanner) or not (move data to Bigtable).
With this two-step approach, we can maintain a backup of the data and address disaster recovery if things go awry. It also means that we don’t need to take batch dumps repeatedly.
For ease of retrieval, we stored the filenames per the timestamp of the recordset.
Roshan, built a stream processing application to ensure that we don’t lose our customers’ streaming data and the sequence. But before we talk about the stream processor, let’s look at the historical data migration leg of this story.
How to move historical data from DynamoDB to GCS?
We first increased the read throughput of each DynamoDB table. We did this because the application needs predefined read capacity units (RCUs) per second to perform read operations from DynamoDB.
And the data pipeline, which we use for batch transfer, also needs some RCUs per second to start its backup process.
For example, if the application needs 500 RCUs per second to run smoothly and AWS Data Pipeline needs another 500 RCUs per second, the read throughput ratio is 0.5 for a total of 1000 RCUs.
Next, we ran a script to launch the additional resources that we require to migrate multiple tables concurrently.
We then ran another script to launch an AWS Data Pipeline job to receive the data dumps from the DynamoDB tables and to transfer them to the relevant S3 buckets.
The following image illustrates this flow.
For each DynamoDB table, the Data Pipeline job creates a separate folder with a timestamp on the S3 bucket. We enabled an SQS queue event for each table on each S3 bucket.
So, when a file is available in an S3 bucket, a corresponding SQS queue receives a message in the json format.
A custom-built, multithreaded application then reads the messages from SQS and downloads the data dump to an Amazon EC2 instance. It then compresses and uploads the files to a GCS bucket with the same name as the corresponding S3 bucket.
We used a compression algorithm (lb-zip2) that reduced our egress cost at the AWS end because it enabled us to compress files to around 70 to 80 percent.
The Data Pipeline job terminates after completion of the DynamoDB table backup to the relevant S3 bucket, and we then revert the read throughput value for each table.
To reduce cost, we ran a custom application on AWS Spot Instances that processes the SQS messages, compresses the backup files, and then transfers them to GCS.
After processing all the messages in the SQS queue for a table and with no unacknowledged messages, the SQS script uploads a
file to a GCS folder specific to the table.
This upload action indicates task completion, which triggers the second phase of data transformation through Google Cloud Dataflow and imports data to Spanner or Bigtable.
How to move historical data from GCS to Spanner and Bigtable?
The folder structure in a GCS bucket is as follows:
Bucket → batch → table_name → timestamp_folder
holds all the compressed json files.
Google Cloud Functions monitors the upload of this
file. It then triggers the Airflow DAG in Cloud Composer, which in turn triggers multiple Cloud Dataflow jobs to load data to Spanner and Bigtable. We also set up Slack notifications through Cloud Composer for success/failure updates.
We moved 110 tables without secondary indexes to Bigtable and 110 tables with secondary indexes to Spanner. Sweeti and Roshan shared our rationale for this and the associated details in their articles “How To Reduce Risks And Prepare For The Unknown” and “Using DAL For Migration From DynamoDB to Spanner and Bigtable”.
We want to share a lesson we learned during the historical data transfer. Rightsizing the Dataflow jobs is quite an interesting puzzle because it has performance implications.
That is, we can potentially overload Spanner with an incorrect number of Dataflow jobs and batch size. Besides, Spanner responds to overloading through Transaction Abort exceptions, which ends up increasing latency and causing multiple retry attempts.
To ensure optimal performance, we adjusted the batch size and the vCPUs associated with the Dataflow jobs. We also configured the Spanner node count per instance, which we calculated to be one-fourth of the total number of vCPUs.
We will now explain how we overcame the Kinesis Data Streams challenge and maintained the data sequence without losing any streaming data.
How to move streaming data from AWS to GCP?
We have already stated the issue that we faced with three tables that had over 1 TB of historical data.
As cost is a primary constraint, we cannot switch to a paid data retention plan on Kinesis Data Streams. So, we have to figure out how to prevent the loss of some streaming data during the batch transfer of these three tables.
While we are not disregarding the importance of managing the streaming data from all the tables and our need to maintain the integrity of all of our customer’s data, these three tables posed the greatest challenge to us.
And this challenge, in some way, led us to build the stream processor, which is a web server that exposes a REST API and receives data through it.
Each of the 220+ tables had an associated stream processor and an uptime checker to inform us, through Slack notifications, if a processor goes down.
Why do we need one processor per table?
Kinesis stores the streaming records as shards and our customer’s user traffic is usually quite high, so we anticipated a huge volume of shards at any given point in time.
To avoid the per-second transaction costs and to manage such a volume of data, we set the batch size to 10K and the batch interval to 5 seconds. Let’s now look at how we moved the streaming data.
AWS Lambda consumes all the shards from Amazon Kinesis Streams. It makes an API call to a stream processor to send the data forward based on either the batch size or the batch interval, whichever comes first.
We used seven instances of NGINX. Each instance proxies the API requests to the individual stream processors, which use the same configuration file that dynamodb-adapter (erstwhile DB Driver) uses to move data to Spanner or Bigtable.
Roshan spoke at length about dynamodb-adapter in his article.
Redis maintains the state (timestamp) of the latest record in Spanner and Bigtable.
As we set a batch interval of 5 seconds, we have to ensure that we migrate all data from AWS to GCP before the actual cutoff.
InfluxDB captures and stores this lag in data transfer, and we used Grafana to visualize the data lag for each table.
Project constraints lead to creative solution discovery
We want to reflect on our project’s trajectory.
Being a distributed monolith presents certain challenges, and a major one is lack of visibility. Special thanks to our customer who suggested that we use Lightstep, an observability tool, which helped us tremendously in finding and fixing problems.
However, a tool alone cannot exterminate all your problems or eliminate the risks of data loss and service downtime. More so, when you consider our customer’s scale of operations.
We are talking about a customer who is one of India’s largest social networking service providers with over 60 million Monthly Active Users (MAU) and over 10 million Daily Active Users (DAU) – and these stats have increased astonishingly today.
At the time, we were dealing with a total peak-time read of 2 million queries per second (QPS) and peak-time write of 560K QPS.
Furthermore, zero downtime, zero data loss, and zero or minimal changes in the application’s code base were non-negotiables.
Given these project stipulations, an incredible time crunch, and a distributed monolith, it still amazes us all (customer, Google, and CloudCover) that we are sharing with you a story of success.
Our collective efforts paid off, but our project’s objective of auditing every step of the way is what got us here.
Without the visual clarity through all our auditing efforts, we wouldn’t have had the depth and breadth of learning that we enjoy today.