Skip to main content World Without Eng

The beauty of a Rust message processor

Published: 2023-05-19
Updated: 2023-10-31

Rust lends itself well to message processing, whether it be from SQS or other queues, or Kafka. Not only is it typed, memory-safe (and therefore thread-safe; great if you’re trying to process multiple messages concurrently!), and extremely performant, it has first-class features to help you handle messages neatly. Those features are enums and pattern matching.

So many failure modes

The main difficulty with writing neat, robust message processing is that there are so many ways it can go wrong. You can fail to parse anything meaningful from your message, you can have a message with an old schema version, you can have a message with too many fields, you can have a message with too few fields, you can have a message with the right fields but the wrong values, you can run into errors with other services or resources as you’re trying to process your message, and on and on. Some of these situations are recoverable, and some aren’t. Some can be recovered later by deploying a patch to your code then retrying. Some will simply never process properly. Some you don’t even want to process but should just ignore. How do you handle all these failure modes without writing endless if else statements?

The answer is: rather than treating each error or validation failure as a unique snowflake, recognize that your error falls into one of a few categories of errors. The most important two categories are Recoverable and Unrecoverable. All errors will fall into one of those two sets, since they are mutually exclusive and exhaustive (as in, something cannot be both recoverable and unrecoverable, but it must be at least one of those two things). We will get finer grain with our error categories in a minute, but let’s start with these two for now as examples.


“Do or do not. There is no try

There are no thrown exceptions in Rust. Instead, any function that can fail should idiomatically return a Result enum. (If you’re not familiar with enums, I recommend taking a look at this page in the Rust book). Result has two variants: Ok and Err. The Ok variant contains a success value, and the Err variant contains an error value.

Using the enum concept, we can encode our two error categories we mentioned earlier: Recoverable and Unrecoverable. That might look something like this:

enum MessageProcessingError {
    Recoverable,
    Unrecoverable,
}

And now, since enums variants can contain data of their own, and an enum is itself just a data type, we can put this MessageProcessingError enum inside our Result::Err enum variant and return it. That lets our message processing code signal to the code pulling messages off the queue that an error occurred, and tells it what kind of error we’re dealing with. If it’s Recoverable, we know we can retry. We can put the message back on the queue for a bit and then pull it again later. If it was Unrecoverable though, we know there’s no point. We can just delete the message and move on.

Handling these error variants is straightforward on the calling side thanks to Rust’s pattern matching. Using the match statement, you can easily run different code for different error variants. Further, the compiler ensures that your match statement is exhaustive. If you ever add a new error variant but forget to handle it, the compiler will complain at you. Our match statement might look something like this:

match call_message_processor(&my_message).await {
    Ok(_) => delete(&my_message).await,
    // with something like SQS, you don't need to explicitly retry - it retries after the MessageVisibilityTimeout expires
    Err(MessageProcessingError::Recoverable) => retry(&my_message).await,
    Err(MessageProcessingError::Unrecoverable) => delete(&my_message).await,
}

Dealing with the DLQ

Now, let’s consider a situation where you’ve set up a dead-letter queue (DLQ). The dead-letter queue takes messages that have failed to process X number of times (where X is configured by you), and holds onto them for some set period of time. If you feel the urge to replay those messages, you could point your message consumer at your dead-letter queue, or just redrive the queue to your main queue or topic (if the technology you’re using allows that). But how do we know when to send something to the dead-letter queue?

Given the two error variants we’ve outlined above, there isn’t a clear signal to send a message to the dead-letter queue. We have two error responses, but there are three ways to deal with the message: retry it on the same queue, move it to the DLQ, or delete it entirely. We need to get finer-grained with our error variants.

Naming is hard, so I encourage you to come up with better names if you want, but I’ve referred to these finer-grain error categories as: Transient (something that just needs a little time), RecoverableWithChange, and Unrecoverable. Now we have three clear signals and three ways of handling it: retry a Transient error (and eventually DLQ it after a max number of retries has been hit—some technologies like SQS let you configure this to happen automatically), DLQ a message that’s RecoverableWithChange, and outright delete an Unrecoverable message.

If you want to learn more about events, message passing, and async architectures, I'd recommend Building Event-Driven Microservices! I picked up a copy with my education stipend from work! Please note this is an affiliate link.

Ignoring the messages you don’t care about

Lastly, let’s consider a situation where messages are coming into your application that you don’t care about. This happens pretty often if you’re reading from something like a Kafka topic. The topic might contain all events for a widget—creates, updates, and deletes—but let’s say your application only cares about widget creates. What do you do?

The easiest thing to do is add a new error variant to your code called something like NotApplicable. When this error variant is received, your queue-polling code can be written to just delete the message.

Sharp readers will note that this is the same action that we take when a message is unrecoverable. Since the action is the same, why do we bother disambiguating these two scenarios? Most often, we want to do this for observability purposes. For instance, if we truly experience an unrecoverable error we probably want to log that an error occurred. If we’re using software like Datadog or New Relic to help us collect and monitor trace data, we probably want to flag in our trace that an error occurred. But if we’re just ignoring a message, we often don’t want to take these same steps. Otherwise we might inadvertently set off our Datadog alerts and get paged, even though the application is doing exactly what it’s supposed to do: ignore widget updates and deletes.

Another approach—and this is perhaps more semantically truthful—is instead of adding an error variant, we might want to consider creating a success enum and encoding the fact that we ignored a message into our success value. I mean, if we don’t want to report an ignored message as an error, should we really encode it as an error? Since being ignored is a valid outcome of processing a message, we should include it with our success value. If we use an enum, it might look like this:

enum MessageProcessingSuccess {
    Processed,
    Ignored,
}

Keep in mind that switching to use this enum will be a breaking change, since rather than adding a variant to an existing enum type, you’re changing the type returned in your Result::Ok altogether. However, since this semantically matches the behavior you expect from your application, it should be easier to read and understand in the long run, which can make your code more maintainable. This way, any time you receive a Result::Err response, you can mark it an error in the trace and log an error. Any time you receive a Result::Ok response, you won’t do that. You can log that you were successful, and then just delete the message. In the end, the final action on the message is still the same, but by having the two different types of successes you get enhanced visibility into what your code is doing, and you set yourself up to easily change the behavior for Processed or Ignored messages down the road without impacting any of your other message handling.

Conclusion

Overall, Rust makes message processing readable and safe. It gives us compile-time checks to make sure we’ve handled all of our different failure modes. We can easily encode the different types of behaviors and semantics we want by leaning on Rust’s robust type system. Further, Rust provides excellent CPU-bound performance, a low memory footprint, and great IO-bound performance with its modern concurrency model (though we can get into these benefits in a different article).

Here’s how our pseudo-program looks all together:

enum MessageProcessingSuccess {
    Processed,
    Ignored,
}


enum MessageProcessingError {
    Transient,
    RecoverableWithChange,
    Unrecoverable,
}

async fn process(my_message: Message) {
    match call_message_processor(&my_message).await {
        Ok(MessageProcessingSuccess::Processed) => {
            log::info!("Successfully processed message");
            delete(&my_message).await;
        }
        Ok(MessageProcessingSuccess::Ignored) => {
            log::info!("Ignored message");
            delete(&my_message).await;
        }
        Err(MessageProcessingError::Transient) => {
            log::error!("Transient error - will retry");
            retry(&my_message).await;
        }
        Err(MessageProcessingError::RecoverableWithChange) => {
            log::error!("Error - sending to DLQ");
            dlq(&my_message).await;     // send to the DLQ
            delete(&my_message).await;  // delete off the main queue
        }
        Err(MessageProcessingError::Unrecoverable) => {
            log::error!("Unrecoverable error. Deleting");
            delete(&my_message).await;
        }
    }
}

Your code may or may not be async, so take that with a grain of salt. Also, you may want to include data with your successes or errors to add context to your logs. For instance, you could have your error variants take a String or a Box<dyn std::error::Error> if you wanted to include more detail about what caused the error in the first place. It’s up to you! Rust provides excellent tools to make it your own.

For more Rust content, check out some of my other Rust articles!

Thanks for reading! If you want to get started with Rust, I'd recommend reading the online Rust book, or if you prefer an actual book, the O'Reilly Rust book is a good choice.

If you want to learn more about concurrency in Rust, I'd also recommend Hands-On Concurrency with Rust. Full disclosure: if you purchase either of those books through that link I'll get a small commission from Amazon. I have read both however, and I genuinely recommend them. Maybe give them a look if you've got an education stipend through work (that's how I got them originally). Happy coding!