Skip to main content

RabbitMQ tutorial - Publish/Subscribe

Publish/Subscribe

(using Spring AMQP)

info

Prerequisites

This tutorial assumes RabbitMQ is installed and running on localhost on the standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.

Where to get help

If you're having trouble going through this tutorial you can contact us through the mailing list or RabbitMQ community Slack.

In the first tutorial we showed how to use start.spring.io to leverage Spring Initializr to create a project with the RabbitMQ starter dependency to create Spring AMQP applications.

In the previous tutorial we created a new package tut2 to place our configuration, sender and receiver and created a work queue with two consumers. The assumption behind a work queue is that each task is delivered to exactly one worker.

In this part we'll implement the fanout pattern to deliver a message to multiple consumers. This pattern is also known as "publish/subscribe" and is implemented by configuring a number of beans in our Tut3Config file.

Essentially, published messages are going to be broadcast to all the receivers.

Exchanges

In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in RabbitMQ.

Let's quickly go over what we covered in the previous tutorials:

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's configure a bean to describe an exchange of this type, and call it tut.fanout:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;


@Profile({"tut3", "pub-sub", "publish-subscribe"})
@Configuration
public class Tut3Config {

@Bean
public FanoutExchange fanout() {
return new FanoutExchange("tut.fanout");
}

@Profile("receiver")
private static class ReceiverConfig {

@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding binding1(FanoutExchange fanout,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

@Bean
public Binding binding2(FanoutExchange fanout,
Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2).to(fanout);
}

@Bean
public Tut3Receiver receiver() {
return new Tut3Receiver();
}
}

@Profile("sender")
@Bean
public Tut3Sender sender() {
return new Tut3Sender();
}
}

We follow the same approach as in the previous two tutorials. We create three profiles for the tutorial (tut3, pub-sub, or publish-subscribe). They are all synonyms for running the fanout profile tutorial. Next we configure the FanoutExchange as a Spring bean. Within the Tut3Receiver class we define four beans: 2 AnonymousQueues (non-durable, exclusive, auto-delete queues in AMQP terms) and 2 bindings to bind those queues to the exchange.

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for fanning out our messages.

Listing exchanges

To list the exchanges on the server you can run the ever useful rabbitmqctl:

sudo rabbitmqctl list_exchanges

In this list there will be some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.

Nameless exchange

In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").

Recall how we published a message before:

   template.convertAndSend(queue.getName(), message)

The first parameter is the routing key and the RabbitTemplate sends messages by default to the default exchange. Each queue is automatically bound to the default exchange with the name of queue as the binding key. This is why we can use the name of the queue as the routing key to make sure the message ends up in the queue.

Now, we can publish to our named exchange instead:

@Autowired
private RabbitTemplate template;

@Autowired
private FanoutExchange fanout; // configured in Tut3Config above

template.convertAndSend(fanout.getName(), "", message);

From now on the fanout exchange will append messages to our queue.

Temporary queues

As you may remember previously we were using queues that had specific names (remember hello). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.

But that's not the case for our fanout example. We want to hear about all messages, not just a subset of them. We're also interested only in currently flowing messages, not in the old ones. To solve that we need two things.

Firstly, whenever we connect to Rabbit, we need a fresh, empty queue. To do this, we could create a queue with a random name, or -- even better -- let the server choose a random queue name for us.

Secondly, once we disconnect the consumer, the queue should be automatically deleted. To do this with the Spring AMQP client, we defined an AnonymousQueue, which creates a non-durable, exclusive, auto-delete queue with a generated name:

@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}

At this point, our queues have random queue names. For example, it may look like spring.gen-1Rx9HOqvTAaHeeZrQWu8Pg.

Bindings

We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding. In the above Tut3Config you can see that we have two bindings, one for each AnonymousQueue.

@Bean
public Binding binding1(FanoutExchange fanout,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1).to(fanout);
}

Listing bindings

You can list existing bindings using, you guessed it,

rabbitmqctl list_bindings

Putting it all together

The producer program, which emits messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our fanout exchange instead of the nameless one. We need to supply a routingKey when sending, but its value is ignored for fanout exchanges. Here goes the code for tut3.Sender.java program:

package org.springframework.amqp.tutorials.tut3;

import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.concurrent.atomic.AtomicInteger;

public class Tut3Sender {

@Autowired
private RabbitTemplate template;

@Autowired
private FanoutExchange fanout;

AtomicInteger dots = new AtomicInteger(0);

AtomicInteger count = new AtomicInteger(0);

@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello");
if (dots.getAndIncrement() == 3) {
dots.set(1);
}
for (int i = 0; i < dots.get(); i++) {
builder.append('.');
}
builder.append(count.incrementAndGet());
String message = builder.toString();
template.convertAndSend(fanout.getName(), "", message);
System.out.println(" [x] Sent '" + message + "'");
}

}

Tut3Sender.java source

As you see, we leverage the beans from the Tut3Config file and autowire in the RabbitTemplate along with our configured FanoutExchange. This step is necessary as publishing to a non-existing exchange is forbidden.

The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.

The code for Tut3Receiver.java:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.util.StopWatch;

public class Tut3Receiver {

@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) throws InterruptedException {
receive(in, 1);
}

@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) throws InterruptedException {
receive(in, 2);
}

public void receive(String in, int receiver) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + receiver + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + receiver + " [x] Done in "
+ watch.getTotalTimeSeconds() + "s");
}

private void doWork(String in) throws InterruptedException {
for (char ch : in.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}

}

Tut3Receiver.java source

Compile as before and we're ready to execute the fanout sender and receiver.

./mvnw clean package

And of course, to execute the tutorial do the following:

# shell 1
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=pub-sub,receiver \
--tutorial.client.duration=60000
# shell 2
java -jar target/rabbitmq-tutorials.jar --spring.profiles.active=pub-sub,sender \
--tutorial.client.duration=60000

Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two ReceiveLogs.java programs running you should see something like:

sudo rabbitmqctl list_bindings
tut.fanout exchange 8b289c9c-a1eb-4a3a-b6a9-163c4fdcb6c2 queue []
tut.fanout exchange d7e7d193-65b1-4128-a532-466a5256fd31 queue []

The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.

To find out how to listen for a subset of messages, let's move on to tutorial 4