Skip to main content

RabbitMQ Stream tutorial - "Hello World!"

Introduction

info

Prerequisites

This tutorial assumes RabbitMQ is installed, running on localhost and the stream plugin enabled. The standard stream port is 5552. In case you use a different host, port or credentials, connections settings would require adjusting.

Using docker

If you don't have RabbitMQ installed, you can run it in a Docker container:

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672 -p 5672:5672  \
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
rabbitmq:3.13

wait for the server to start and then enable the stream and stream management plugins:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management 

Where to get help

If you're having trouble going through this tutorial you can contact us through the mailing list or discord community server.

RabbitMQ Streams was introduced in RabbitMQ 3.9. You can find more information here.

"Hello World"

(using the Rust Stream Client)

In this part of the tutorial we'll write two programs in Rust; a producer that sends a single message, and a consumer that receives messages and prints them out. We'll gloss over some of the detail in the Rust client API, concentrating on this very simple thing just to get started. It's the "Hello World" of RabbitMQ Streams.

The Rust stream client library

RabbitMQ speaks multiple protocols. This tutorial uses RabbitMQ stream protocol which is a dedicated protocol for RabbitMQ streams. There are a number of clients for RabbitMQ in many different languages, see the stream client libraries for each language. We'll use the Rust stream client provided by RabbitMQ.

RabbitMQ Rust stream client 0.4.2 and later versions are distributed via crates.io.

This tutorial assumes you are using powershell on Windows. On MacOS and Linux nearly any shell will work.

Setup

First let's verify that you have the Rust toolchain in PATH:

rustc --version
cargo --version

Running that command should produce a help message.

An executable version of this tutorial can be found in the RabbitMQ tutorials repository.

Now let's create the project:

cargo new rust-stream
cd rust-stream

Copy the required dependencies from the Cargo.toml file on GitHub in your own Cargo.toml file.

Now we have the Rust project set up we can write some code.

Sending

We'll call our message producer (sender) send.rs and our message consumer (receiver) receive.rs. The producer will connect to RabbitMQ, send a single message, then exit.

Create the send.rs file in the src/bin directory.

In send.rs, we need some use declarations:

use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{ByteCapacity, Message, ResponseCode};

then we can create a connection to the server:

use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;

The entry point of the stream Rust client is the Environment. It is used for configuration of RabbitMQ stream publishers, stream consumers, and streams themselves.

It abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.

This tutorial assumes that stream publisher and consumer connect to a RabbitMQ node running locally, that is, on localhost. To connect to a node on a different machine, simply specify target hostname or IP address on the Environment::builder().

Next let's create a producer.

The producer will also declare a stream it will publish messages to and then publish a message:

let stream = "hello-rust-stream";
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}
let producer = environment.producer().build(stream).await?;
producer
.send_with_confirm(Message::builder().body("Hello, World!").build())
.await?;

The stream declaration operation is idempotent: the stream will only be created if it doesn't exist already. In this case, we ignore the error StreamAlreadyExists because we don't care if the stream already exists.

A stream is an append-only log abstraction that allows for repeated consumption of messages until they expire. It is a good practice to always define the retention policy. In the example above, the stream is limited to be 5 GiB in size.

The message content is a byte array. Applications can encode the data they need to transfer using any appropriate format such as JSON, MessagePack, and so on.

When the code above finishes running, the producer connection and stream-system connection will be closed. That's it for our producer.

Each time the producer is run, it will send a single message to the server and the message will be appended to the stream.

The complete send.rs file can be found on GitHub.

Sending doesn't work!

If this is your first time using RabbitMQ and you don't see the "Sent" message then you may be left scratching your head wondering what could be wrong. Maybe the broker was started without enough free disk space (by default it needs at least 50 MB free) and is therefore refusing to accept messages. Check the broker logfile to confirm and reduce the limit if necessary. The configuration file documentation will show you how to set disk_free_limit.

Receiving

The other part of this tutorial, the consumer, will connect to a RabbitMQ node and wait for messages to be pushed to it. Unlike the producer, which in this tutorial publishes a single message and stops, the consumer will be running continuously, consume the messages RabbitMQ will push to it, and print the received payloads out.

Create the receive.rs file in the src/bin directory.

Similarly to send.rs, receive.rs needs use declarations:

use std::io::stdin;
use rabbitmq_stream_client::error::StreamCreateError;
use rabbitmq_stream_client::types::{ByteCapacity, OffsetSpecification, ResponseCode};
use futures::{StreamExt};
use tokio::task;

When it comes to the initial setup, the consumer part is very similar the producer one; we use the default connection settings and declare the stream from which the consumer will consume.

use rabbitmq_stream_client::Environment;
let environment = Environment::builder().build().await?;
let stream = "hello-rust-stream";
let create_response = environment
.stream_creator()
.max_length(ByteCapacity::GB(5))
.create(stream)
.await;

if let Err(e) = create_response {
if let StreamCreateError::Create { stream, status } = e {
match status {
// we can ignore this error because the stream already exists
ResponseCode::StreamAlreadyExists => {}
err => {
println!("Error creating stream: {:?} {:?}", stream, err);
}
}
}
}

Note that the consumer part also declares the stream. This is to allow either part to be started first, be it the producer or the consumer.

We use the Consumer struct to create the consumer.

Consumer provides the method next() to get the next message from the stream.

offset defines the starting point of the consumer. In this case, the consumer starts from the very first message available in the stream.

let mut consumer = environment
.consumer()
.offset(OffsetSpecification::First)
.build(stream)
.await
.unwrap();

let handle = consumer.handle();
task::spawn(async move {
while let Some(delivery) = consumer.next().await {
let d = delivery.unwrap();
println!("Got message: {:#?} with offset: {}",
d.message().data().map(|data| String::from_utf8(data.to_vec()).unwrap()),
d.offset(),);
}
});

The complete receive.rs file can be found on GitHub.

Putting It All Together

In order to run both examples, open two terminal (shell) tabs.

Both parts of this tutorial can be run in any order, as they both declare the stream. Let's run the consumer first so that when the first publisher is started, the consumer will print it:

cargo run --bin receive

Then run the producer:

cargo run --bin send

The consumer will print the message it gets from the publisher via RabbitMQ. The consumer will keep running, waiting for new deliveries. Try re-running the publisher several times to observe that.

Streams are different from queues in that they are append-only logs of messages that can be consumed repeatedly. When multiple consumers consume from a stream, they will start from the first available message.