AMQP 1.0 Client Libraries
This page documents the usage of AMQP 1.0 client libraries for RabbitMQ 4.0 or more.
The RabbitMQ team supports the following libraries:
- RabbitMQ AMQP 1.0 Java client
- RabbitMQ AMQP 1.0 .NET client
- RabbitMQ AMQP 1.0 Go client
- RabbitMQ AMQP 1.0 Python client
Application developers will find here how to use the libraries for the most common use cases. For other information like licensing, downloading, dependency management, advanced and specific usage and configuration, please see the README page in the repository of the respective libraries.
Overview
The RabbitMQ team maintains a set of AMQP 1.0 client libraries designed and optimized for RabbitMQ. They offer a simple and safe, yet powerful API on top of AMQP 1.0. Applications can publish and consume messages with these libraries, as well as manage the server topology in a consistent way across programming languages. The libraries also offer advanced features like automatic connection and topology recovery, and connection affinity with queues.
RabbitMQ is compatible with any AMQP-1.0-compliant client library. It is not mandatory to use the RabbitMQ AMQP 1.0 client libraries with RabbitMQ, but applications are strongly encouraged to do so for the best experience.
Safety
RabbitMQ AMQP 1.0 client libraries are safe by default, they always create durable entities and always publish persistent messages.
Guarantees
RabbitMQ AMQP 1.0 client libraries provide at-least-once guarantees.
The broker always confirms the proper handling of published messages.
Publishers achieve this by using the unsettled
sender settle mode and the first
receiver settle mode when they get created.
Consumers must always signal the result of message processing to the broker.
Consumers use the same settings as publishers when they get created (first
receiver settle mode and unsettled
sender settle mode).
Client API
This section covers how to use the RabbitMQ AMQP 1.0 client libraries to connect to a cluster, and publish and consume messages.
Connecting
Libraries provide an entry point to a node or a cluster of nodes. Its name is the "environment". The environment allows creating connections. It can contain infrastucture-related configuration settings shared between connections (e.g. pools of threads for Java). Here is how to create the environment:
- Java
- C#
- Python
import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
// ...
// create the environment instance
Environment environment = new AmqpEnvironmentBuilder()
.build();
// ...
// close the environment when the application stops
environment.close();
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
// ...
// create the environment instance
IEnvironment environment = await AmqpEnvironment.CreateAsync(
ConnectionSettingBuilder.Create().Build());
// ...
// close the environment when the application stops
await environment.CloseAsync();
from rabbitmq_amqp_python_client import Environment
# ...
# create the environment instance
environment = Environment()
# ...
# close the environment when the application stops
environment.close()
There is usually one environment instance for an application process. The application must close the environment to release its resources when it exits.
Applications open connections from the environment. They must specify appropriate settings to connect to the cluster nodes (URI, credentials).
- Java
- C#
- Python
// open a connection from the environment
Connection connection = environment.connectionBuilder()
.uri("amqp://admin:admin@localhost:5672/%2f")
.build();
// ...
// close the connection when it is no longer necessary
connection.close();
// open a connection from the environment setting
IConnection connection = await environment.CreateConnectionAsync();
//open a connection from the environment with different settings
ConnectionSettingBuilder otherSettingBuilder = ConnectionSettingBuilder.Create()
.ContainerId("my_containerId")
.Host("localhost");
IConnection connection = await environment.CreateConnectionAsync(otherSettingBuilder.Build());
// ...
// close the connection when it is no longer necessary
await connection.CloseAsync();
# open a connection from the environment setting
connection = environment.connection("amqp://guest:guest@localhost:5672/")
# close the connection when it is no longer necessary
connection.close()
Libraries use the ANONYMOUS
SASL authentication mechanism by default.
Connections are expected to be long-lived objects, applications should avoid connection churn.
They must be closed when they are no longer needed.
Publishing
A publisher must be created to publish messages. The target a publisher will publish messages to is usually set on creation, but it also possible to set on a per-message basis.
Here is how to declare a publisher with the target set at creation time:
- Java
- C#
- Python
Publisher publisher = connection.publisherBuilder()
.exchange("foo").key("bar")
.build();
// ...
// close the publisher when it is no longer necessary
publisher.close();
// The publisher can use exchange (optionally with a key) or queue to publish messages.
IPublisher publisher = await connection.PublisherBuilder().Exchange("foo").Key("bar")
.BuildAsync();
// ...
// close the publisher when it is no longer necessary
await publisher.CloseAsync();
publisher.Dispose();
# The publisher can use exchange (optionally with a key) or queue to publish messages.
# You can use the AddressHelper utility class to get the addr from the exchange name and the key
exchange_address = AddressHelper.exchange_address("foo", "bar")
publisher = connection.publisher(addr)
# close the publisher when it is no longer necessary
publisher.close()
In the previous example, every message published with the publisher will go to the foo
exchange with the bar
routing key.
RabbitMQ uses the AMQ 0.9.1 model comprising exchanges, queues, and bindings.
Messages are created from the publisher instance. They follow the AMQP 1.0 message format. It is possible to define the body (as an array of bytes), standard properties, and application properties.
When a message is published, the broker indicates how it dealt with it in an asynchronous callback.
The client application take appropriate measures depending on the status ("outcome" in AMQP terms) the broker returned for the message (e.g. store the message in another place if the message has not been accepted
).
The following snippet shows how to create a message, publish it, and deal with the response from the broker:
- Java
- C#
- Python
// create the message
Message message = publisher
.message("hello".getBytes(StandardCharsets.UTF_8))
.messageId(1L);
// publish the message and deal with broker feedback
publisher.publish(message, context -> {
// asynchronous feedback from the broker
if (context.status() == Publisher.Status.ACCEPTED) {
// the broker accepted (confirmed) the message
} else {
// deal with possible failure
}
});
// create the message
var message = new AmqpMessage("Hello");
// publish the message and deal with broker feedback
// The result is synchronous, use a `List<Task<PublishResult>>` to increase the performances
PublishResult pr = await publisher.PublishAsync(message);
switch (pr.Outcome.State)
{
case OutcomeState.Accepted:
// the broker accepted (confirmed) the message
break;
case OutcomeState.Released:
// the broker could not route the message anywhere
break;
case OutcomeState.Rejected:
// at least one queue rejected the message
break;
}
# create the message
message = Message(body="Hello")
# publish the message and deal with broker feedback
# The result is synchronous
status = publisher.publish(Message(message)
match status.remote_state:
case OutcomeState.ACCEPTED:
# the broker accepted (confirmed) the message
case OutcomeState.RELEASED:
# the broker could not route the message anywhere
case OutcomeState.REJECTED:
# at least one queue rejected the message
The publisher example above send messages to a given exchange with a given routing key, but this is not the only supported target for a publisher. Here are the supported non-null targets for a publisher:
- Java
- C#
- Python
// publish to an exchange with a routing key
Publisher publisher1 = connection.publisherBuilder()
.exchange("foo").key("bar") // /exchanges/foo/bar
.build();
// publish to an exchange without a routing key
Publisher publisher2 = connection.publisherBuilder()
.exchange("foo") // /exchanges/foo
.build();
// publish to a queue
Publisher publisher3 = connection.publisherBuilder()
.queue("some-queue") // /queues/some-queue
.build();
// publish to an exchange with a routing key
Publisher publisher = await connection.PublisherBuilder()
.Exchange("foo")
.Key("bar")
.BuildAsync();
// publish to an exchange without a routing key
Publisher publisher = await connection.PublisherBuilder()
.Exchange("foo") // /exchanges/foo
.BuildAsync();
// publish to a queue
IPublisher publisher = await _connection.PublisherBuilder()
.Queue("some-queue")// /queues/some-queue
.BuildAsync();
# publish to an exchange with a routing key
# You can use the AddressHelper utility class to get the addr from the exchange name and the key
exchange_address = AddressHelper.exchange_address("foo", "bar")
publisher = connection.publisher(addr)
# publish to an exchange without a routing key
exchange_address = AddressHelper.exchange_address("foo")
publisher = connection.publisher(addr)
# publish to a queue
queue_address = AddressHelper.queue_address("some_queue")
publisher = connection.publisher(queue_address)
Libraries translate the API calls into the address format v2.
It is also possible to define the target on a per-message basis.
The publisher must be defined without any target and each message define its target in the to
field of the properties section.
Libraries provide helpers in the message creation API to define the message target, which avoids dealing with the address format.
The following snippet shows how to create a publisher without a target and define messages with different target types:
- Java
- C#
- Python
// no target defined on publisher creation
Publisher publisher = connection.publisherBuilder()
.build();
// publish to an exchange with a routing key
Message message1 = publisher.message()
.toAddress().exchange("foo").key("bar")
.message();
// publish to an exchange without a routing key
Message message2 = publisher.message()
.toAddress().exchange("foo")
.message();
// publish to a queue
Message message3 = publisher.message()
.toAddress().queue("my-queue")
.message();
// no target defined on publisher creation
IPublisher publisher = await connection.PublisherBuilder()
.BuildAsync();
// publish to an exchange with a routing key
IMessage message = new AmqpMessage("Hello!").ToAddress()
.Exchange("foo")
.Key("bar")
.Build()
await aPublisher.PublishAsync(message);
// publish to a queue
IMessage message = new AmqpMessage("Hello!").ToAddress()
.Queue("foo")
.Build()
await aPublisher.PublishAsync(message);
# no target defined on publisher creation
publisher = connection.publisher()
# publish to an exchange with a routing key
# You can use the AddressHelper.message_to_address_helper
# utility class to set the destination in the message
message = Message(body="Hello!")
exchange_address = AddressHelper.exchange_address("foo", "bar")
message = AddressHelper.message_to_address_helper(message, exchange_address)
publisher.publish(message)
# publish to a queue
message = Message(body="Hello!")
queue_address = AddressHelper.queue_address("some_queue")
message = AddressHelper.message_to_address_helper(message, queue_address)
publisher = publisher.publish(message)
Consuming
Consumer Creation
Creating a consumer consists in specifying the queue to consume from and the callback to process messages:
- Java
- C#
- Python
Consumer consumer = connection.consumerBuilder()
.queue("some-queue")
.messageHandler((context, message) -> {
byte[] body = message.body();
// ...
context.accept(); // settle the message
})
.build(); // do not forget to build the instance!
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-queue")
.MessageHandler(async (context, message) =>
{
// deal with the message
await context.AcceptAsync();// settle the message
}
).BuildAndStartAsync();
class MyMessageHandler(AMQPMessagingHandler):
def __init__(self):
super().__init__()
def on_message(self, event: Event):
# accepting
self.delivery_context.accept(event)
# deal with the message
queue_address = AddressHelper.queue_address("some_queue")
consumer = connection.consumer(queue_address, message_handler=MyMessageHandler())
consumer.run()
Once the application is done processing a message, it must settle it. This indicates to the broker the result of the processing and what it should do with the message (e.g. deleting the message). Applications must settle messages or they will run out of credits and the broker will stop dispatching messages to them.
The next section covers the semantics of message settlement.