
This page gives an overview of the RabbitMQ Java client API.
The code samples given here demonstrate connecting to AMQP brokers and using RPC services exposed via AMQP.
For more details, please see the relevant Javadoc documentation.
The client API is closely modelled on the AMQP protocol specification, with little additional abstraction. (Future releases will provide increased type-safety by encapsulating AMQP entities more thoroughly; please see the roadmap for more detail.)
For more detail on the classes used in this document, please see the Javadoc documentation.
The core API classes are Connection
and Channel, representing an AMQP connection and an
AMQP data channel, respectively:
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
The holder class AMQP stores all the code
generated automatically from the AMQP XML protocol
definition specification. It contains all required
content-class-specific content header definitions (such
as AMQP.BasicProperties) and all the
method request and response descriptors (such
as AMQP.Basic.Publish
and AMQP.Queue.BindOk), as well as useful
protocol-specific constants and other values.
For details and exact definitions, please see the AMQP specification document.
import com.rabbitmq.client.AMQP;
The following code connects to an AMQP broker using the given parameters (host name, port number, etc) :
ConnectionParameters params = new ConnectionParameters(); params.setUsername(userName); params.setPassword(password); params.setVirtualHost(virtualHost); params.setRequestedHeartbeat(0); ConnectionFactory factory = new ConnectionFactory(params); Connection conn = factory.newConnection(hostName, portNumber);
The Connection interface can then be used to open a channel and request an access ticket to a realm:
Channel channel = conn.createChannel(); int ticket = channel.accessRequest(realm);
The channel and access ticket can now be used to send and receive messages, as described in subsequent sections.
To disconnect, simply close the channel and the connection:
channel.close(AMQP.REPLY_SUCCESS, "Goodbye"); conn.close(AMQP.REPLY_SUCCESS, "Goodbye");
Note that closing the channel may be considered good practice, but isn't strictly necessary here - it will be done automatically anyway when the underlying connection is closed.
Client applications work with exchanges and queues, the high-level building blocks of AMQP. These must be "declared" before they can be used. Declaring either type of object simply ensures that one of that name exists, creating it if necessary.
Continuing the previous example, the following code declares an exchange and a queue, then binds them together.
channel.exchangeDeclare(ticket, exchangeName, "direct"); channel.queueDeclare(ticket, queueName); channel.queueBind(ticket, queueName, exchangeName, routingKey);
This will actively declare the following objects:
both of which can be customised by using additional parameters. Here neither of them have any special arguments.
The above function calls then bind the queue to the exchange with the given routing key.
Note that all three of these Channel API methods are overloaded.
These convenient short forms of exchangeDeclare, queueDeclare and queueBind
use sensible defaults. There are also longer forms with more parameters, to let you override these defaults
as necessary, giving full control where needed.
This "short version, long version" pattern is used throughout the client API uses.
To publish a message to an exchange, use Channel.basicPublish as follows:
byte[] messageBodyBytes = "Hello, world!".getBytes(); channel.basicPublish(ticket, exchangeName, routingKey, null, messageBodyBytes);
For fine control, you can use overloaded variants to specify the mandatory and immediate
flags, or send messages with basic-class header properties :
channel.basicPublish(ticket, exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
This sends a message with delivery mode 2 (persistent) and content-type "text/plain". You can specify as many parameters as you like :
channel.basicPublish(ticket, exchangeName, routingKey,
new AMQP.BasicProperties
(contentType, contentEncoding, headers, deliveryMode,
priority, correlationId, replyTo, expiration,
messageId, timestamp, type, userId,
appId, clusterId),
messageBodyBytes);
Here any or all of the parameters to the BasicProperties constructor may be null.
Note also that BasicProperties is an inner class of the autogenerated
holder class AMQP.
In general, Channel instances should not be used by more than one thread simultaneously: application code should maintain a clear notion of thread ownership for Channel instances. If more than one thread needs to access a particular Channel instance, the application should enforce mutual exclusion itself, for example by synchronising on the Channel.
Symptoms of incorrect serialisation of Channel operations include, but are not limited to,
IllegalStateExceptions with the message "cannot execute more than one synchronous AMQP command at a time", and UnexpectedFrameErrors.
To retrieve individual messages, use Channel.basicGet.
The returned value is an instance of GetResponse, from which
the header information (properties) and message body can be extracted :
boolean noAck = false;
GetResponse response = channel.basicGet(ticket, queueName, noAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
...
Since the noAck = false above, you must also call
Channel.basicAck to acknowledge that you
have successfully received and processed the message :
...
channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message
}
import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;
Another way to receive messages is to set up a subscription using the Consumer
interface. The messages will then be delivered automatically as they arrive, rather than
having to be requested proactively.
The easiest way to implement a Consumer object is to subclass the convenience class
DefaultConsumer, then override methods as necessary. You will generally want to override
the core interface method handleDelivery :
boolean noAck = false;
channel.basicConsume(ticket, queueName, noAck,
new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components ...)
channel.basicAck(deliveryTag, false);
}
});
More sophisticated consumers will need to override further methods.
In particular, handleShutdownSignal traps channel / connection
closure, and handleConsumeOk is passed a server-generated consumer
tag when none is supplied to the initial basicConsume call.
Consumers can also implement handleCancelOk method to be notified of cancellations.
You can cancel an active consumer with Channel.basicCancel :
channel.basicCancel(consumerTag);
When calling the API methods, you always refer to consumers by their delivery tags, which can be either client- or server-generated as explained in the AMQP specification document.
If a message is published with the "mandatory" or "immediate" flags set, but cannot be
delivered, the broker will return it to the sending client
(via a AMQP.Basic.Return command).
To be notified of such returns, clients can implement the ReturnListener
interface and calling Channel.setReturnListener.
If the client has not configured a return listener for a particular channel,
then the associated returned messages will be silently dropped.
channel.setReturnListener(new ReturnListener() {
public void handleBasicReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});
This return listener will be called, for example, if the client publishes a message with the "mandatory" flag set to an exchange of "direct" type which is not bound to a queue.
This release of the RabbitMQ Java client supports an experimental method of tunnelling AMQP over SSL/TLS.
If your broker is available via a generic SSL tunnel -
provided by a program such as stunnel, for instance -
you can use an SSLSocketFactory to provide
secure communication with the broker.
In many cases, it is sufficient to do the following:
ConnectionFactory factory = new ConnectionFactory(params); factory.useSslProtocol(); Connection conn = factory.newConnection(hostName, portNumber);
There are three variants of ConnectionFactory.useSslProtocol:
useSslProtocol() - uses the default SSL
protocol (SSLv3) with a blindly-trusting
TrustManager;
useSslProtocol(String protocol) - uses
the supplied SSL protocol name with a blindly-trusting
TrustManager; or
useSslProtocol(String protocol, TrustManager
tm) - uses the supplied SSL protocol name and
TrustManager.
Each of the three variants ends up calling
SSLContext.getInstance followed by
SSLContext.init, with no key managers, no
secure random source, and just the passed in trust
manager.
In cases where the convenience methods do not provide
enough control, such as when you have actual
authentication requirements rather than simply using SSL
for its encryption abilities, you will need to configure
an SSLSocketFactory instance yourself. The
following is a rough sketch of the steps required:
ConnectionFactory factory = new ConnectionFactory(params);
javax.net.ssl.SSLContext sslContext = javax.net.ssl.SSLContext.getInstance("SSLv3");
javax.net.ssl.KeyManager km = /* expression constructing your custom key manager */;
javax.net.ssl.TrustManager tm = /* expression constructing your custom trust manager */;
java.security.SecureRandom random = /* expression constructing your custom randomness source */;
sslContext.init(new javax.net.ssl.KeyManager[] { km },
new javax.net.ssl.TrustManager[] { tm },
random);
factory.setSocketFactory(sslContext.getSocketFactory());
Connection conn = factory.newConnection(hostName, portNumber);
...
As a programming convenience, the Java client API offers a
class RpcClient which uses a temporary reply
queue to provide simple RPC-style communication facilities via AMQP.
The class doesn't impose any particular format on the RPC arguments and return values. It simply provides a mechanism for sending a message to a given exchange with a particular routing key, and waiting for a response on a reply queue.
import com.rabbitmq.client.RpcClient; RpcClient rpc = new RpcClient(channel, ticket, exchangeName, routingKey);
(The implementation details of how this class uses AMQP are as follows: request messages are sent with the
basic.correlation_id field set to a value unique for this RpcClient instance,
and with basic.reply_to set to the name of the reply queue.)
Once you have created an instance of this class, you can use it to send RPC requests by using any of the following methods:
byte[] primitiveCall(byte[] message); String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)
The primitiveCall method transfers raw byte arrays as the request and response
bodies. The method stringCall is a thin
convenience wrapper around primitiveCall,
treating the message bodies as String instances
in the default character encoding.
The mapCall variants are a little more sophisticated: they encode
a java.util.Map containing ordinary Java values
into an AMQP binary table representation, and decode the
response in the same way. (Note that there are some restrictions on what value
types can be used here - see the javadoc for details.)
All the marshalling/unmarshalling convenience methods use primitiveCall as a
transport mechanism, and just provide a wrapping layer on top of it.
The AMQP connection and channel share the same general approach to managing network failure, internal failure, and explicit local shutdown.
The AMQP connection and channel have the following lifecycle states:
open: the object is ready to use
closing: the object has been explicitly
notified to shut down locally, has issued a shutdown
request to any supporting lower-layer objects, and is
waiting for their shutdown procedures to complete
closed: the object has received all
shutdown-complete notification(s) from any lower-layer
objects, and as a consequence has shut itself down
Those objects always end up in the closed state, regardless of the reason that casued the closure, like an application request, an internal client library failure, a remote network request or network failure.
The AMQP connection and channel objects possess the following shutdown-related methods:
addShutdownListener(ShutdownListener
listener) and
removeShutdownListener(ShutdownListener
listener), to manage any listeners, which will
be fired when the object transitions to
closed state. Note that, adding a
ShutdownListener to an object that is already closed
will fire the listener immediately
getCloseReason(), to allow the
investigation of what was the reason of the object's
shutdown
isOpen(), useful for testing whether the
object is in an open state
close(int closeCode, String
closeMessage), to explictly notify the object
to shut down
Simple usage of listeners would look like:
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;
connection.addShutdownListener(new ShutdownListener() {
public void service(new ShutdownSignalException cause)
{
...
}
});
One can retrieve the
ShutdownSignalException, which contains all
the information available about the close reason, either
by explictly calling the getCloseReason()
method or by using the cause parameter in
the service(ShutdownSignalException cause)
method of the ShutdownListener class.
The ShutdownSignalException class provides
methods to analyze the reason of the shutdown. By
calling the isHardError() method we get
information whether it was a connection or a channel
error.
public void service(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = cause.getConnection();
if (!cause.isInitiatedByApplication())
{
Object reason = cause.getReason();
...
}
...
} else {
Channel ch = cause.getChannel();
...
}
}
Use of the isOpen() method of channel and
connection objects is not recommended for production
code, because the value returned by the method is
dependent on the existence of the shutdown cause. The
following code illustrates the possibility of race
conditions:
public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and txCommit() call
...
channel.txCommit();
}
}
Instead, we should normally ignore such checking, and
simply attempt the action desired. If during the
execution of the code the channel of the connection is
closed, a ShutdownSignalException will be
thrown indicating that the object is in an invalid
state. We should also catch for IOException
caused either by SocketException, when
broker closes the connection unexpectedly, or
ShutdownSignalException, when broker
initiated clean close.
public void validMethod(Channel channel)
{
try {
...
channel.txCommit();
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}