RabbitMQ


This page gives an overview of the RabbitMQ Java client API.

The code samples given here demonstrate connecting to AMQP brokers and using RPC services exposed via AMQP.

For more details, please see the relevant Javadoc documentation.

Core API Guide

Connections and channels

The client API is closely modelled on the AMQP protocol specification, with little additional abstraction. (Future releases will provide increased type-safety by encapsulating AMQP entities more thoroughly; please see the roadmap for more detail.)

For more detail on the classes used in this document, please see the Javadoc documentation.

The core API classes are Connection and Channel, representing an AMQP connection and an AMQP data channel, respectively:

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

The holder class AMQP stores all the code generated automatically from the AMQP XML protocol definition specification. It contains all required content-class-specific content header definitions (such as AMQP.BasicProperties) and all the method request and response descriptors (such as AMQP.Basic.Publish and AMQP.Queue.BindOk), as well as useful protocol-specific constants and other values.

For details and exact definitions, please see the AMQP specification document.

import com.rabbitmq.client.AMQP;

Connecting to a broker

The following code connects to an AMQP broker using the given parameters (host name, port number, etc) :

ConnectionParameters params = new ConnectionParameters();
params.setUsername(userName);
params.setPassword(password);
params.setVirtualHost(virtualHost);
params.setRequestedHeartbeat(0);
ConnectionFactory factory = new ConnectionFactory(params);
Connection conn = factory.newConnection(hostName, portNumber);

The Connection interface can then be used to open a channel and request an access ticket to a realm:

Channel channel = conn.createChannel();
int ticket = channel.accessRequest(realm);

The channel and access ticket can now be used to send and receive messages, as described in subsequent sections.

To disconnect, simply close the channel and the connection:

channel.close(AMQP.REPLY_SUCCESS, "Goodbye");
conn.close(AMQP.REPLY_SUCCESS, "Goodbye");

Note that closing the channel may be considered good practice, but isn't strictly necessary here - it will be done automatically anyway when the underlying connection is closed.

Using exchanges and queues

Client applications work with exchanges and queues, the high-level building blocks of AMQP. These must be "declared" before they can be used. Declaring either type of object simply ensures that one of that name exists, creating it if necessary.

Continuing the previous example, the following code declares an exchange and a queue, then binds them together.

channel.exchangeDeclare(ticket, exchangeName, "direct");
channel.queueDeclare(ticket, queueName);
channel.queueBind(ticket, queueName, exchangeName, routingKey);

This will actively declare the following objects:

  1. a non-durable, non-autodelete exchange of "direct" type
  2. a non-durable, non-exclusive, non-autodelete queue

both of which can be customised by using additional parameters. Here neither of them have any special arguments.

The above function calls then bind the queue to the exchange with the given routing key.

Note that all three of these Channel API methods are overloaded. These convenient short forms of exchangeDeclare, queueDeclare and queueBind use sensible defaults. There are also longer forms with more parameters, to let you override these defaults as necessary, giving full control where needed.

This "short version, long version" pattern is used throughout the client API uses.

Publishing messages

To publish a message to an exchange, use Channel.basicPublish as follows:

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(ticket, exchangeName, routingKey, null, messageBodyBytes);

For fine control, you can use overloaded variants to specify the mandatory and immediate flags, or send messages with basic-class header properties :

channel.basicPublish(ticket, exchangeName, routingKey,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,
                     messageBodyBytes);

This sends a message with delivery mode 2 (persistent) and content-type "text/plain". You can specify as many parameters as you like :

channel.basicPublish(ticket, exchangeName, routingKey,
                     new AMQP.BasicProperties
                       (contentType, contentEncoding, headers, deliveryMode,
                        priority, correlationId, replyTo, expiration,
                        messageId, timestamp, type, userId,
                        appId, clusterId),
                     messageBodyBytes);

Here any or all of the parameters to the BasicProperties constructor may be null.

Note also that BasicProperties is an inner class of the autogenerated holder class AMQP.

Channel thread-safety

In general, Channel instances should not be used by more than one thread simultaneously: application code should maintain a clear notion of thread ownership for Channel instances. If more than one thread needs to access a particular Channel instance, the application should enforce mutual exclusion itself, for example by synchronising on the Channel.

Symptoms of incorrect serialisation of Channel operations include, but are not limited to, IllegalStateExceptions with the message "cannot execute more than one synchronous AMQP command at a time", and UnexpectedFrameErrors.

Retrieving individual messages

To retrieve individual messages, use Channel.basicGet. The returned value is an instance of GetResponse, from which the header information (properties) and message body can be extracted :

boolean noAck = false;
GetResponse response = channel.basicGet(ticket, queueName, noAck);
if (response == null) {
    // No message retrieved.
} else {
    AMQP.BasicProperties props = response.getProps();
    byte[] body = response.getBody();
    long deliveryTag = response.getEnvelope().getDeliveryTag();
    ...

Since the noAck = false above, you must also call Channel.basicAck to acknowledge that you have successfully received and processed the message :

    ...
    channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}

Retrieving messages by subscription

import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

Another way to receive messages is to set up a subscription using the Consumer interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively.

The easiest way to implement a Consumer object is to subclass the convenience class DefaultConsumer, then override methods as necessary. You will generally want to override the core interface method handleDelivery :

boolean noAck = false;
channel.basicConsume(ticket, queueName, noAck,
     new DefaultConsumer(channel) {
	 @Override public void handleDelivery(String consumerTag,
				Envelope envelope,
				AMQP.BasicProperties properties,
				byte[] body)
	     throws IOException {
	     String routingKey = envelope.getRoutingKey();
	     String contentType = properties.contentType;
	     long deliveryTag = envelope.getDeliveryTag();
	     // (process the message components ...) 

	     channel.basicAck(deliveryTag, false);
	 }
     });

More sophisticated consumers will need to override further methods. In particular, handleShutdownSignal traps channel / connection closure, and handleConsumeOk is passed a server-generated consumer tag when none is supplied to the initial basicConsume call.

Consumers can also implement handleCancelOk method to be notified of cancellations.

You can cancel an active consumer with Channel.basicCancel :

channel.basicCancel(consumerTag);

When calling the API methods, you always refer to consumers by their delivery tags, which can be either client- or server-generated as explained in the AMQP specification document.

Handling unroutable or undelivered messages

If a message is published with the "mandatory" or "immediate" flags set, but cannot be delivered, the broker will return it to the sending client (via a AMQP.Basic.Return command).

To be notified of such returns, clients can implement the ReturnListener interface and calling Channel.setReturnListener. If the client has not configured a return listener for a particular channel, then the associated returned messages will be silently dropped.

channel.setReturnListener(new ReturnListener() {
        public void handleBasicReturn(int replyCode, 
					String replyText, 
					String exchange, 
					String routingKey, 
					AMQP.BasicProperties properties, 
					byte[] body) 
            throws IOException {
            ...
        }
    });

This return listener will be called, for example, if the client publishes a message with the "mandatory" flag set to an exchange of "direct" type which is not bound to a queue.

Tunnelling via SSL/TLS

Picture illustrating stunnel4 SSL proxy

This release of the RabbitMQ Java client supports an experimental method of tunnelling AMQP over SSL/TLS.

If your broker is available via a generic SSL tunnel - provided by a program such as stunnel, for instance - you can use an SSLSocketFactory to provide secure communication with the broker.

In many cases, it is sufficient to do the following:

ConnectionFactory factory = new ConnectionFactory(params);
factory.useSslProtocol();
Connection conn = factory.newConnection(hostName, portNumber);

There are three variants of ConnectionFactory.useSslProtocol:

  • useSslProtocol() - uses the default SSL protocol (SSLv3) with a blindly-trusting TrustManager;
  • useSslProtocol(String protocol) - uses the supplied SSL protocol name with a blindly-trusting TrustManager; or
  • useSslProtocol(String protocol, TrustManager tm) - uses the supplied SSL protocol name and TrustManager.

Each of the three variants ends up calling SSLContext.getInstance followed by SSLContext.init, with no key managers, no secure random source, and just the passed in trust manager.

In cases where the convenience methods do not provide enough control, such as when you have actual authentication requirements rather than simply using SSL for its encryption abilities, you will need to configure an SSLSocketFactory instance yourself. The following is a rough sketch of the steps required:

ConnectionFactory factory = new ConnectionFactory(params);
javax.net.ssl.SSLContext sslContext = javax.net.ssl.SSLContext.getInstance("SSLv3");
javax.net.ssl.KeyManager km = /* expression constructing your custom key manager */;
javax.net.ssl.TrustManager tm = /* expression constructing your custom trust manager */;
java.security.SecureRandom random = /* expression constructing your custom randomness source */;
sslContext.init(new javax.net.ssl.KeyManager[] { km },
                new javax.net.ssl.TrustManager[] { tm },
                random);
factory.setSocketFactory(sslContext.getSocketFactory());
Connection conn = factory.newConnection(hostName, portNumber);
...

Basic RPC

As a programming convenience, the Java client API offers a class RpcClient which uses a temporary reply queue to provide simple RPC-style communication facilities via AMQP.

The class doesn't impose any particular format on the RPC arguments and return values. It simply provides a mechanism for sending a message to a given exchange with a particular routing key, and waiting for a response on a reply queue.

import com.rabbitmq.client.RpcClient;

RpcClient rpc = new RpcClient(channel, ticket, exchangeName, routingKey);

(The implementation details of how this class uses AMQP are as follows: request messages are sent with the basic.correlation_id field set to a value unique for this RpcClient instance, and with basic.reply_to set to the name of the reply queue.)

Once you have created an instance of this class, you can use it to send RPC requests by using any of the following methods:

byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)

The primitiveCall method transfers raw byte arrays as the request and response bodies. The method stringCall is a thin convenience wrapper around primitiveCall, treating the message bodies as String instances in the default character encoding.

The mapCall variants are a little more sophisticated: they encode a java.util.Map containing ordinary Java values into an AMQP binary table representation, and decode the response in the same way. (Note that there are some restrictions on what value types can be used here - see the javadoc for details.)

All the marshalling/unmarshalling convenience methods use primitiveCall as a transport mechanism, and just provide a wrapping layer on top of it.

Shutdown Protocol

Overview of the AMQP client shutdown

The AMQP connection and channel share the same general approach to managing network failure, internal failure, and explicit local shutdown.

The AMQP connection and channel have the following lifecycle states:

  • open: the object is ready to use
  • closing: the object has been explicitly notified to shut down locally, has issued a shutdown request to any supporting lower-layer objects, and is waiting for their shutdown procedures to complete
  • closed: the object has received all shutdown-complete notification(s) from any lower-layer objects, and as a consequence has shut itself down

Those objects always end up in the closed state, regardless of the reason that casued the closure, like an application request, an internal client library failure, a remote network request or network failure.

The AMQP connection and channel objects possess the following shutdown-related methods:

  • addShutdownListener(ShutdownListener listener) and removeShutdownListener(ShutdownListener listener), to manage any listeners, which will be fired when the object transitions to closed state. Note that, adding a ShutdownListener to an object that is already closed will fire the listener immediately
  • getCloseReason(), to allow the investigation of what was the reason of the object's shutdown
  • isOpen(), useful for testing whether the object is in an open state
  • close(int closeCode, String closeMessage), to explictly notify the object to shut down

Simple usage of listeners would look like:

import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;

connection.addShutdownListener(new ShutdownListener() {
    public void service(new ShutdownSignalException cause)
    {
        ...
    }
});

Information about the circumstances of a shutdown

One can retrieve the ShutdownSignalException, which contains all the information available about the close reason, either by explictly calling the getCloseReason() method or by using the cause parameter in the service(ShutdownSignalException cause) method of the ShutdownListener class.

The ShutdownSignalException class provides methods to analyze the reason of the shutdown. By calling the isHardError() method we get information whether it was a connection or a channel error.

public void service(ShutdownSignalException cause)
{
  if (cause.isHardError())
  {
    Connection conn = cause.getConnection();
    if (!cause.isInitiatedByApplication())
    {
      Object reason = cause.getReason();
      ...
    }
    ...
  } else {
    Channel ch = cause.getChannel();
    ...
  }
}

Atomicity and use of the isOpen() method

Use of the isOpen() method of channel and connection objects is not recommended for production code, because the value returned by the method is dependent on the existence of the shutdown cause. The following code illustrates the possibility of race conditions:

public void brokenMethod(Channel channel)
{
    if (channel.isOpen())
    {
        // The following code depends on the channel being in open state.
        // However there is a possibility of the change in the channel state
        // between isOpen() and txCommit() call
        ...
        channel.txCommit();
    }
}

Instead, we should normally ignore such checking, and simply attempt the action desired. If during the execution of the code the channel of the connection is closed, a ShutdownSignalException will be thrown indicating that the object is in an invalid state. We should also catch for IOException caused either by SocketException, when broker closes the connection unexpectedly, or ShutdownSignalException, when broker initiated clean close.

public void validMethod(Channel channel)
{
    try {
        ...
        channel.txCommit();
    } catch (ShutdownSignalException sse) {
        // possibly check if channel was closed
        // by the time we started action and reasons for
        // closing it
        ...
    } catch (IOException ioe) {
        // check why connection was closed
        ...
    } 
}