Intercepting Messages
Overview
RabbitMQ provides a generic mechanism to intercept messages on the broker. Interception can occur at two stages:
- Incoming messages – intercepted when a message enters RabbitMQ, just before it is routed to queues.
- Outgoing messages – intercepted when RabbitMQ delivers a message to a client, just before it is converted to the target protocol.
Interceptors are executed by one of the following Erlang processes:
- AMQP 1.0 session
- AMQP 0.9.1 channel
- MQTT connection
Messages sent over the RabbitMQ Streams protocol are not intercepted.
A message interceptor is an Erlang module that implements the rabbit_msg_interceptor behaviour. What the interceptor does is entirely up to its implementation - it can validate message metadata, add annotations, or perform arbitrary side effects.
Custom interceptors can be developed and integrated via plugins.
RabbitMQ ships with several built-in message interceptors. Below are examples of how to configure them using the rabbitmq.conf file.
Incoming Message Interceptors
Timestamp
This interceptor adds a timestamp to each incoming message:
message_interceptors.incoming.set_header_timestamp.overwrite = true
- AMQP 1.0 and Streams clients receive a message annotation:
x-opt-rabbitmq-received-time
(timestamp in milliseconds since January 1, 1970 UTC). - AMQP 0.9.1 clients receive:
timestamp_in_ms
header (milliseconds) for compatibility with the former Message Timestamp Plugintimestamp
property (seconds)
To preserve an existing timestamp_in_ms
header, set overwrite
to false
:
message_interceptors.incoming.set_header_timestamp.overwrite = false
Routing Node
This interceptor adds a message annotation x-routed-by
indicating which RabbitMQ node received and routed the message:
message_interceptors.incoming.set_header_routing_node.overwrite = true
Set overwrite
to false
to preserve an existing value:
message_interceptors.incoming.set_header_routing_node.overwrite = false
MQTT client ID
If the MQTT plugin is enabled, RabbitMQ can annotate incoming messages with the client ID of the publishing MQTT client.
This is done by adding a message annotation with the key x-opt-mqtt-client-id
.
mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true
This annotation is visible to AMQP 1.0, AMQP 0.9.1, and Streams consumers. However, MQTT clients will not receive this annotation, as the MQTT spec does not allow arbitrary broker-added annotations.
Outgoing Message Interceptors
Timestamp
This interceptor timestamps messages when they are sent to clients:
message_interceptors.outgoing.timestamp.enabled = true
The annotation key is x-opt-rabbitmq-sent-time
, and its value is a timestamp in milliseconds since January 1, 1970 UTC.