Consuming over 1 billion Kafka messages per day at Ifood
This is the second part of a series of blog posts showing how we’re evolving Ifood’s architecture in the User Profile team. So, I recommend you to read the first post here. It’s not mandatory, but it will be easier to understand where we come from and where we want to go. If don’t want to, I’ll do a quick recap.
Quick recap
We have a microservice that stores Ifood’s customer metadata (internally we call it account-metadata) and during peak hours it reaches over 2 million requests per minute. This system is called by the web and mobile apps and by many internal teams to fetch customers’ data. The data is stored into a single DynamoDB table (with 1.3 billion items using 757,3GB).
What kind of data we store there? Features of each Ifood’s user. Some examples of features:
- a counter of the total of orders
- the top 3 favorite dishes
- the favorite restaurants
- in which segmentation the user is
This was the architecture:
Main problem
The pipeline of the data team, to export the data from data lake, process it using Databricks and Airflow, and then finally send it to account-metadata, consists of lots of steps and could fail very easily. Often something fails along the we not always have the best monitoring of every single piece. Which means that it was not reliable. This is a big problem for us as the User Profile team, given that the quality of our data is at the core of our concerns and interest.
That was always a huge pain, so we started to look around and think about how we could change that part of the infrastructure. Which means, to change the ingestion of hundreds of millions of records per day.
Our savior: Feature Store
While we’re looking for alternatives and trying to figure out how to replace the entire ingestion process, one of the ML teams inside Ifood was building a new awesome tool: a Feature Store (FS) project. In summary, Feature Store is a way to provide and share data very easily to empower ML applications, model training, and real-time predictions. On one side, FS reads data from somewhere (data lake, data warehouse, Kafka topics, etc), aggregates it, does some kind of processing or calculation, and then exports the results on the other side (API, in some database, Kafka topics, etc).
When we heard of that, it became clear that was pretty much what we needed: a centralized, unique, and very organized way to consume data from the data lake. Even though we would be using the FS for something that’s not really related to ML applications, that would be a very suited use case for us. It would make things extremely easier for us: they would export the data somehow, then we’d just need to save it on our database. We would change the super complex, and very fragile pipeline, to a robust and solid mechanism. After talking with the Feature Store team, we decided that they would export the features to us via a Kafka topic.
However, the FS was not able to export the features in batches, which means that to each Ifood customer (around 60 million) and to each feature, a message would be exported. At that time, we had around 20 features but were already planning to increase it to ~ 30 or 40. That means 60mi * 20 = 1.2 billion messages per day, but probably growing that number to over 1.5bi few months later.
So, we should be able to consume around 1.5bi Kafka messages per day.
Consuming data from Feature Store
As I said, the FS would export the data into a Kafka topic. We defined the schema to be like this:
{
account_id: string
feature_name: string
feature_value: string
namespace: string
timestamp: int
}
With that, we could create a consumer that would listen to the Kafka topic and save the features in the DynamoDB table.
In the Dynamo table we’re using the account_id
as the partition key and namespace
as the sorting key. As the name says, the namespace splits the data that the account-metadata system provides into different contexts.
That’s how our table looks like:
| account_id | namespace | columns and its values… || 283a385e-8822–4e6e-a694-eafe62ea3efb | orders | total_orders: 3 | total_orders_lunch: 2 |
| c26d796a-38f9–481f-87eb-283e9254530f | rewards | segmentation: A |
That’s how the architecture would look like:
The consumer reads from the Kafka topic and saves the data into DynamoDB.
We did the first implementation of our consumer using Java. It performs quite well, but very far from what we needed: 4k features consumed per second per pod/consumer. We tried some tweaks and different configuration, but is was still far away from the 1.5 bi.
After that, we tried a different implementation with Go using goka, a high level Go lib to interact with Kafka. It was waay better: 8.5k features consumed per second per pod/consumer.. However, still quite far from what we needed.
Finally, still using Go, but with sarama, we’re able to implement a worker to consume 1 million events per minute (20k features consumed per second per pod/consumer.). Each pod/consumer creates three goroutines to process the messages received from Kafka. Yay, we did it! This was the third attempt, so we learned a couple of bits in how to properly configure a Kafka client, setting the right size of data batch to read and so on.
In the Grafana graphic bellow, you can see the number of messages being processed over time (the different colors represents different namespaces).
Processing over 1 million events per minute resulted in a lot of writes operations in DynamoDB. At first, the database suffered quite a bit, so we had to scale it to 50k write capacity units per second.
Testing it
Once the consumer was done, we had to make sure that we’re not losing messages, not having race conditions issues and that the data was being saved properly. To test that, we added a hardcoded prefix to the namespace name in the consumer to save, let’s say, the data from the orders
namespace into a testing-orders
namespace. With that, we wrote a couple of scripts to scan the database and compared the values between the namespaces (orders vs testing-orders) to ensure they’re the same.
Making it better and faster
Well, we had our first version. It was working properly, consuming events really fast and saving them correctly in DynamoDB. However, the database costs were really high and this first approach was quite ineffective because we’re doing one write per message to Dynamo even though we had a couple of messages from the same account_id
quite closely in the Kafka topic, given that we’re using the account_id
as the key partition. Something like this:
| account_id | feature_name | feature_value | namespace | timestamp|| user1 | total_orders | 3 | orders | ts1 |
| user1 | total_orders_lunch | 1 | orders | ts3 |
| user1 | segmentation | foo | rewards | ts2 |
| user2 | segmentation | foo | rewards | ts2 |
As you can see, the first and second records are from the same user and to the same namespace. With the account_id
as the key partition, the events from the same account_id would be consumed by the same consumer. With that, we could create a kind of in-memory buffer to aggregate and put the events of the same account_id and namespace together and write them once in Dynamo. And that’s what we did.
We also changed the Kafka consumer to fetch data from the topic in batch, increasing the number of bytes “per fetch”, resulting in a greater number of messages. Instead of getting and processing one message at a time, we fetch 1000 messages, create a map where the key is the account_id
and the value is a list of features, and makes one save in Dynamo per account_id in the map. With that, we decreased the number of operations in Dynamo by 4 times.
One important detail is that while processing one message at a time, it was easy to mark the message as “processed” and commit it. However, while processing one thousand of them, is not that easy to mark them as read and commit if in the middle of processing a couple of messages in this batch fails. We had to be more careful on the part of the code.
Another crucial configuration is how you scale new pods, because if you don’t do that very well, the consumers may spend a lot of time rebalancing and while they’re doing it they won’t consume events. We configured the auto-scaling of Kubernetes to scale new pods based on the CPU usage of each pod. So, once a pod reaches 30% of CPU, it will scale a new one. We used a low threshold to quickly scale lots of pods to avoid the consumers spending a lot of time rebalancing.
Monitoring
To monitor the workflow, we invested a lot of time to have very good observability of the process. We used a lot of Prometheus/Grafana to create custom metrics for the consumers and get metrics of the pods; Datadog to collect metrics from the Kafka topic and created dashboards about the consumer group lag and collect metrics of the cluster in general; New Relic to collect errors from the consumers and get a little bit more data from the consumers.
Main takeaways
- You most likely won’t get the best result on the first attempt. Take notes of what’s good and what isn’t and do good research trying to find what could be better.
- Processing this amount of data can be intimidating, but it’s not the end of the world. It does require focus and attention to the details, but you can do it.
- While processing this amount of data it’s also hard to make sure you’re not losing data, not having race condition problems, and so on. Invest a lot of time on that.
- It does require some tunning to make the Kafka consumers very fast. My main advice is to read carefully the documentation and play around with the parameters. The main ones are “fetch.min.bytes”, “auto commiting” and “setting max intervals”.