Since I last wrote about Reaxial we’ve come up with some new abstractions that make it easier to write reactive handlers, and have been busy transitioning our code to use the new architecture. I thought I’d take this opportunity to share our progress with you.
As we started transitioning to Reaxial, we realized that creating an entire service for each reactive component was a bit of overkill. Many features we have implemented with reactive components run sporadically and are not particularly time sensitive, and typically there are a number of features that depend on the same updates. Having a separate process and a separate connection to Kafka is wasteful and inefficient in these cases. However, other features have to react in a timely fashion, so for those we do want a dedicated process with its own Kafka connection.
To accommodate these different use cases, we came up with the concept of a “stage” service that can host one or more “actors”. An “actor” is our basic building block for reactive components. Each actor is a python class that derives from this abstract base class:
class Actor(object): def topics(self): """ Return a list of the topic(s) this actor cares about. """ raise NotImplemented def interval(self): """ Return the batching interval for this actor. This is the maximum interval. If another actor on the same stage has a shorter interval, then the batching interval will match that interval. """ return 30 def process(self, topic, messages): """ Called periodically for this actor to process messages that have been received since the last batching interval. If messages for multiple different topics have been received, then this method will be called once for each different topic. The messages will be passed as an array of tuples (offset, message). """ raise NotImplemented @property def log(self): return getLogger(self.__module__)
All that is required for an actor class to override is topics() and process(). The topics() method simply returns a list of Kafka topics that the actor wants to handle, and the process() method is then called periodically by the stage service with a set of messages from one of these topics. The stage service works by collecting a batch of messages (1000 by default) across all the topics that all the actors within that stage care about, and then invoking each actor’s process() method with the messages in the topics that that actor cares about. If the batching interval expires while the stage is collecting messages, then the messages that have already been collected are processed immediately.
Once an actor is defined, it has to be configured to run within a specific stage. We are using a simple INI-style config file using betterconfig to define the various stages. Each stage is a section in the config file and the actors are specified by adding the python dotted path to the actor class to a list inside the section. In addition, the batch size for the stage can be changed here too.
We are still in the middle of the process of converting the functionality in our legacy platform to Reaxial, but we have already defined 30 actors running on 7 different stages. Having the infrastructure to easily decompose a feature into reactive components like actors improves the modularity and reliability of our system, and also improves testability. We can very easily write unit tests that pass specific messages to an actor and by mocking out the methods that the actor calls, we can test arbitrary scenarios without having to set up anything in the database. Plus, because actors only implement one feature, or one piece of a feature, they are straightforward unit testing targets.
One obvious area for improvement is to enhance the stage service so that it dynamically decides which actors to run on which stages by observing their behavior. This has always been in our plans, but because it is a complicated optimization problem and carries significant risks if not implemented properly, we decided to stick with the manual stage configuration for now, coupled with monitoring of the stages to ensure that time-sensitive messages are being handled within the expected time. So far this is working well, and as we improve this system we’ll keep you updated on our progress.