MessageBus – How to Keep Receivers Fast and Resilient
Written by Martin Führlinger, Software Engineer Backend
In my previous blog posts, I wrote about how we use the message bus to decouple services, and what we actually send. You can find these posts here:
This post explains how we make our receivers resilient and fast, and why we choose to do this.
To keep our receivers as fast as possible, we try to do as much as we can asynchronously. For this purpose we are using Sidekiq and Resque, depending on the service which is receiving the message.
As you can imagine, the fastest approach would be to enqueue a job for every received message. As not every message needs further processing at every consumer, this would lead to a lot of unneeded jobs enqueued and processed. Additionally, as the message content is not parsed yet, it might be that the message itself is invalid.
That’s why we choose to parse the message to avoid invalid messages in our system. Additional filtering to avoid unnecessary jobs on the consumer side is done as well. Basically, we follow these steps:
- parse the message
- check the message content to filter out unnecessary ones
- enqueue a job
Parsing the message is basically just reading the content and mapping it to a valid object. If that fails, the message is invalid and will be NACKed. Such messages are re-delivered or dropped. If there is a dead letter exchange defined, the messages will be passed to that exchange. Additional information of dead letter exchanges and how we use it at Runtastic will be found in a future blog post.
The second step is checking the message content. The decision whether a message has work-worthy content can be based on attributes of the sent entity or on the changed attributes we add to the meta information of the message. For example, when calculating records of a running session, if the user changes a note in that session, the records don’t have to be recalculated. However, if the distance or duration is changed, it is necessary.
So based on the information that was added to the metainformation, we can decide if we want to enqueue a job or not. This also helps avoid loops: the record calculation may change the running session again, but it won’t update relevant attributes for recalculation. In the following picture, you can see the message being triggered and received, then checked for relevant changes. We then enqueue or discard the message based on that information. It also depicts the processing of the message in the asynchronous worker to show the possible loop (shown using the green dotted arrows).
To check if a message is worth working on, we follow a few rules:
- use attributes of the entity if possible
- use changed attributes on updated messages, if available
- try to avoid database queries, or use only simple ones (fetch by id e.g.)
- don’t call other services / external urls
These guidelines help the receiver method get finished quickly with resilience, as there is not much in there which can fail.
The last step is to enqueue a job in Sidekiq. Therefore, we enqueue a job by passing the message payload to the job. This may seem weird, but as Sidekiq and Resque are storing the parameters in a Redis, it has to be serialized again. By passing the whole message, we can reuse the message-to-object mapper already used in parsing the message.
To sum it up, following a few rules leads to better performance and better resilience of your message bus system. The scaling is simplified, as it can be done at the workers. Sending messages is asynchronous already, so adding another layer does not do any harm.
Want to continue reading our series about message bus? In our next post, we talk about how we handle dead letters with the dead letter exchange.