SQS -> Flink -> HTTP throughput
In Forter, we’re in the process of migrating a VM-based processing service, which reads from SQS, to Flink + K8S. This process is written in Python, so initially we’re using Flink to just ‘feed’ the Python K8S service, and gradually we’ll decompose the Python and convert it to Flink code.
We encountered very pathological behavior in Flink -> K8S, where we saw the throughput was low compared to naive VMs that consume from SQS.
We tried playing with the parallelism parameters but this didn’t help - as it turns out, this only made things worse!
The Actual Problem
Our processors process one event at a time. If you send them several http requests, they wait in line. This means if we can process X messages concurrently and you send 10X, then 9X http messages are just “waiting to be executed”.
This isn’t a problem from an http point of view — messages are lightweight — but when taking the event from SQS you set a “visibility timeout”, meaning “how long should SQS hide this event before it makes it visible again”.
Events that are just “waiting in memory” to be executed are still on the clock — if they take too long we’ll pull them again and reprocess the same events!
Solution
- Decreasing the total in-flight messages to roughly the max concurrency of the processors
- Increasing the visibility timeout when retrieving a message
Technical calculations
How can we determine what parallelism and visibility timeout we should have?
Concurrency
We need to have (number of async senders) * (messages sent concurrently per sender) to be roughly equal to “number of concurrent processors”. Decreasing this from ~20,000 -> 500 we saw a significant reduction in retied events
Visibility Timeout
Visibility timeout should be “time for message to go through Flink until removed from queue”, with some buffer.
Since we process one message at a time, if we match the Flink concurrency to the processor concurrency we should be fine with just “average processing time with buffer for longer processes”, right?
Well… for us, the processor concurrency is autoscaled according to queue size for cost reduction, whereas Flink concurrency we cannot change dynamically in this way (yet!).
If we have all pods up and running, that timeout should be okay, but if we have less pods running, we process less — if we have 1/4 the pods we have 1/4 the throughput so we need 4x the timeout!
So, timeout should be: (time to process a single event) * (sender concurrency / processor concurrency). This means if you expect your processor concurrency to go X times lower, your timeout will be X times higher.