How to Consume 12 Million Kafka Messages in an Hour

Welcome to our exploration of Navi, a powerful solution crafted to enhance data management within our organization. I’m Matt General, a staff software engineer with five years of experience in the marketplace, and I’m excited to share how we tackled a significant challenge in tracking seller performance metrics. The core issue we faced was the absence of crucial date information in our database. Although we had a wealth of data generated from events tied to title status updates, it wasn’t organized in a way that made it easily accessible. To address this, we needed a method to listen to all existing events for a historical data load.

While I considered using C#, I wanted a more engaging approach—enter GO, a language that aligns with our engineering goals. This was a one-off task, and I envisioned a solution that not only met immediate needs but could also integrate seamlessly into our existing framework. My goal was to develop the components that were missing, transforming raw event data into actionable insights for performance metrics. Join me as we delve into the details of Navi, its architecture, and the benefits it brings to our event-driven data processing capabilities.

Introducing Navi

Navi is a Golang data hub library specifically designed to align with Kafka architecture, incorporating key concepts such as commands, data hub events, data hub admin events, and domain events. Its use of familiar terminology makes it accessible and easy to understand. Built on top of IBM Sarama, Navi can automatically scale based on the number of partitions, requiring no special tuning for performance optimization. It allows users to start from the earliest or latest offsets, enhancing flexibility in event consumption. Notably, Navi demonstrates impressive speed, capable of processing 12 million messages per hour in a production environment with a single pod configured with 100 partitions and running 100 Go threads. This level of performance is significant, as it emphasizes not only the library’s efficiency but also its capacity for handling high-throughput scenarios. While the current implementation focuses on consuming events, Navi sets the foundation for future enhancements and more complex processing capabilities. Overall, Navi aims to provide a robust and efficient solution for managing Kafka events within a data hub framework.

What I Did

The historical load process involved consuming all events from Kafka and updating the database based on observed update events. For each update, the system queried the database using the title ID to check for existing timestamps related to status changes. If a timestamp was missing, it would update and save this information in a transaction, with automatic rollback in case of any errors. Initially, the process handled 12 million events, but it has now reached 13 million updated events, indicating a substantial volume of titles. The system processed approximately 3,000 messages per second, which put considerable strain on the database, described as “spicy,” but overall, it managed to handle the load effectively without major issues. This setup highlights the efficiency and robustness of the system in managing high-throughput event processing while ensuring data integrity.

Creating Navi

Setting up a Navi consumer involves standard boilerplate tasks, such as configuring your environment, creating a processing map for events, setting up Kafka configurations, initializing a logger, and establishing database connections. These steps are common across various systems. To create the consumer, you’ll need to provide the Kafka configuration, a processor map, and a logger, ensuring everything is ready for effective event handling. This foundational setup is essential for smooth operation within the Navi framework.

Processor Maps

The processor map in Navi is essentially a map of processor maps, including types for data hub, data hub admin, command, and domain. Each of these topic types has a distinct JSON event structure, requiring tailored handling. This necessity is why four different processor maps are utilized. Each map operates with a key that represents both the topic and the event name, ensuring that the system can accurately process events based on their specific types and structures. This structured approach allows for effective organization and management of various event types within the framework, enhancing overall efficiency in message processing.

When monitoring title events, such as “data hub title V2 production,” the system listens for specific events, like “updated.” Each event type has a corresponding handler that defines its structure in a Go struct, outlining the necessary fields and their types. The process involves parsing JSON to unpack event data effectively. While much of the surrounding code consists of boilerplate, the focus is on creating these handlers. This setup simplifies the message-handling process, enabling developers to manage the details of the event while relying on established patterns for parsing and processing. Ultimately, this structure allows for efficient event management within the Navi framework.

The core focus of the code in Navi revolves around the “title updated message processor,” which is defined as a struct. This struct includes a crucial method called processMessage, which takes two parameters: a base data hub message and a byte slice representing the raw message The method returns an error if needed, allowing for error handling during processing.

By implementing this method, the struct acts as a handler within the Navi framework. When the processMessage method is invoked, it receives the base message, which contains extracted metadata, and the raw message for unpacking. This allows developers to manipulate the data as needed. The process involves unmarshalling the byte slice to convert it into a structured format, enabling access to all defined fields within the event.

This setup simplifies event processing, allowing for efficient handling of various message types. While much of the surrounding code may be boilerplate, the key focus is on defining the struct and the associated method that handles the specific logic for the event. This design pattern leverages Go’s strengths, providing a clear and effective way to manage message processing within the system.

A Note on Interfaces

Each processor map type in Navi requires structs that align with specific interfaces. If a struct defines the necessary function methods, it automatically implements the interface, as Go uses implicit interface implementation. This means there’s no formal declaration; simply defining the required methods is sufficient for the struct to be recognized as utilizing that interface.

Future Enhancements

As the database load increased, it became apparent that configuring the number of threads used in processing would be beneficial. Although I haven’t explored this in depth, Sarama appears to allow such configuration, which is crucial to avoid overwhelming the database with excessive requests, especially in a production environment. Running 100 threads simultaneously could strain resources and impact performance.

Additionally, implementing custom metrics for Datadog would enhance monitoring capabilities, particularly for timestamp-based offsets, which Sarama already supports. This feature has been a topic of interest among users, indicating a demand for better integration and tracking.

Furthermore, there is a strong need for improved documentation to facilitate user experience. This includes clear makefiles, practical examples, and performance tests to guide developers in optimizing their implementations. Such resources would not only make it easier to understand the capabilities of Navi and Sarama but also help users effectively troubleshoot and enhance their applications. Overall, these enhancements would significantly improve the functionality and usability of the system, addressing the challenges encountered during high-load scenarios while fostering a better developer experience.

GO for Everyone

In conclusion, Navi exemplifies the power and efficiency of using Go for high-throughput data processing. By enabling the consumption of 12 million messages in just one hour, we’ve demonstrated the capabilities of this language in handling large datasets quickly. As I continue to enhance Navi, I encourage others to explore the potential of Go in their projects. This initiative not only showcases what’s possible with robust programming but also aims to inspire more developers to leverage Go for their own high-performance needs. Thank you for joining me on this journey!