I spent time with a client this week to solve an interesting problem — to adjust the number of concurrent requests to a downstream service dynamically based on response time and error rate. This is a common challenge when integrating with third parties, so we decided to share our approach so others might benefit from it.
My client is in the financial services sector and they integrate with over 50 service providers. They have API access to the providers’ data and they need to fetch them from every provider on a daily basis.
However, many of these providers place restrictions and rate limits on their APIs, for example:
- You can only access the API during off-peak hours.
- You cannot access the API during the weekly maintenance window.
- You can only make X concurrent requests to the API.
My client has implemented a scheduler service to manage the timing constraints. The scheduler knows when to kick off the ingestion process for each provider. But there’s also a secondary process in which their users (financial advisers) can schedule ad-hoc ingestions themselves.
And there are other additional contexts that we had to keep in mind:
- The APIs follow an established industry standard. Unfortunately, they can only return one record at a time.
- The API response times are highly variable — anything from 5ms to 70s.
- The amount of data the client needs to fetch varies from provider to provider. Some just a few thousand records, others would have hundreds of thousands of records.
- We should not fetch the same record from a provider more than once per day. This owes to the usage limits.
- This ingestion process is one of the most business-critical parts of their system. The migration to this new system needs to be managed carefully to minimise the potential impact to their customers.
To solve the aforementioned challenge, we need a mechanism to:
- Fetch a large number of records from a provider’s API slowly and steadily. The process needs to be reliable but can take several hours if need be.
- Only fetch a maximum of X records concurrently.
- Gradually reduce the concurrency when the provider API’s response time or error rate starts to escalate.
Rather than implementing such a mechanism from scratch, we chose to leverage the built-in batching support with Kinesis and Lambda. In our proposed setup:
- Every provider has a Kinesis stream.
- The scheduler service enqueues the records that we need to fetch into the stream.
- A ventilator Lambda function would receive records in batches and fan them out to a worker function (via direct function invocation).
- There are many different worker functions, one for every provider. This is because most providers have provider-specific business logic.
- To avoid processing the same record twice in the same day, the ventilator tracks records that have been processed today in a DynamoDB table.
- To support retrying failed records, the ventilator also tracks the number of attempts at processing each record in the same DynamoDB table. When it encounters partial failures, the ventilator would increment the attempts count for the failed records and except. Kinesis would automatically retry the same batch of records for us. On subsequent attempts, previously successful records are ignored.
From a high-level, the system looks like this:
This design does not deliver maximum throughput, which is by design! As discussed earlier, this is due to the daily usage limits and scalability concerns of the provider APIs.
Every provider has its own CloudFormation stack with the Kinesis stream, the ventilator and worker Lambda functions, as well as the DynamoDB table. This set up makes it easy to migrate to the new system gradually, one provider at a time.
The one stream per provider configuration lets us control concurrency using the batch size of the Kinesis event source mapping. The ventilator function can, therefore, self-adjust its batch size by updating the Kinesis event source mapping.
As the provider API’s response time goes up, the ventilator can respond by reducing its batch size. This gives the API a chance to catch its breath and recover. When the response time returns to acceptable levels, then the ventilator can gradually increase its batch size back to previous levels.
We went even a step further.
If the provider API still struggles at a batch size of 1, then something is clearly not right. You can’t change the batch size to 0, but you can disable the Kinesis event source mapping altogether. When you re-enable it later, Kinesis will push events to the ventilator from where it stopped before, as if nothing’s happened.
However, if the Kinesis event source is disabled, then something else needs to trigger wake the ventilator up later. For this, we have a CloudWatch schedule to trigger the ventilator function every 10 minutes.
If the Kinesis event source is already enabled then these CloudWatch-triggered invocations simply do nothing. When Kinesis is disabled, however, the ventilator can re-enable it at a batch size of 1. This allows one request to the provider API on the next invocation by Kinesis. If the response time is still above the threshold, then we will disable Kinesis again and the cycle repeats. Until such a time that the provider API is able to respond to requests in a timely fashion. Then the Kinesis stream stays enabled and the ventilator function would gradually restore its previous batch size.
This is the classic circuit breaker pattern applied to stream processing.
Show me how it works!
If you want to see how this can work in practice, I put together a demo app for illustration purpose. You can find the source code here. Follow the instructions in the README to deploy the demo app and pump data in using the provided feed-stream script.
With this demo app, I can adjust the ratio of slow and erroneous responses from the worker function via its environment variables.
When I do, you will soon see the ventilator take actions to reduce its batch size. It will adjust the batch size, incrementing and decrementing accordingly.
You can also create a metric filter to extract the batch size from the logs, and turn it into a metric in CloudWatch. This lets you observe how the batch size changes over time.
If the elevated response time and error rate continue, then eventually the ventilator function would disable its Kinesis event source.
The cron job would then re-enable the stream every 10 minutes.
And if the worker function has recovered.
Then the ventilator function would gradually increase its batch size.
Until it eventually returns to its original batch size of 10. If you monitor how the batch size changes along the way, you will probably see something along the lines of:
In this post, we discussed a design for a self-healing Kinesis function that can adapt its throughput based on performance. In doing so we’re able to carefully tune the number of concurrent requests our system makes against third-party APIs and stay within their operational limits. We also discussed how such design can implement the circuit breaker pattern and take the foot off the paddle entirely when the third-party API is struggling.
I hope you have enjoyed this post, don’t forget to let us know if you have any suggestions or comments.