Skip to main content
Version: Next

Intercepting Messages

Overview

RabbitMQ provides a generic mechanism to intercept messages on the broker. Interception can occur at two stages:

  1. Incoming messages – intercepted when a message enters RabbitMQ, just before it is routed to queues.
  2. Outgoing messages – intercepted when RabbitMQ delivers a message to a client, just before it is converted to the target protocol.

Interceptors are executed by one of the following Erlang processes:

  • AMQP 1.0 session
  • AMQP 0.9.1 channel
  • MQTT connection

Messages sent over the RabbitMQ Streams protocol are not intercepted.

A message interceptor is an Erlang module that implements the rabbit_msg_interceptor behaviour. What the interceptor does is entirely up to its implementation - it can validate message metadata, add annotations, or perform arbitrary side effects.

Custom interceptors can be developed and integrated via plugins.

RabbitMQ ships with several built-in message interceptors. Below are examples of how to configure them using the rabbitmq.conf file.

Incoming Message Interceptors

Timestamp

This interceptor adds a timestamp to each incoming message:

message_interceptors.incoming.set_header_timestamp.overwrite = true
  • AMQP 1.0 and Streams clients receive a message annotation: x-opt-rabbitmq-received-time (timestamp in milliseconds since January 1, 1970 UTC).
  • AMQP 0.9.1 clients receive:
    • timestamp_in_ms header (milliseconds) for compatibility with the former Message Timestamp Plugin
    • timestamp property (seconds)

To preserve an existing timestamp_in_ms header, set overwrite to false:

message_interceptors.incoming.set_header_timestamp.overwrite = false

Routing Node

This interceptor adds a message annotation x-routed-by indicating which RabbitMQ node received and routed the message:

message_interceptors.incoming.set_header_routing_node.overwrite = true

Set overwrite to false to preserve an existing value:

message_interceptors.incoming.set_header_routing_node.overwrite = false

MQTT client ID

If the MQTT plugin is enabled, RabbitMQ can annotate incoming messages with the client ID of the publishing MQTT client. This is done by adding a message annotation with the key x-opt-mqtt-client-id.

mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true

This annotation is visible to AMQP 1.0, AMQP 0.9.1, and Streams consumers. However, MQTT clients will not receive this annotation, as the MQTT spec does not allow arbitrary broker-added annotations.

Outgoing Message Interceptors

Timestamp

This interceptor timestamps messages when they are sent to clients:

message_interceptors.outgoing.timestamp.enabled = true

The annotation key is x-opt-rabbitmq-sent-time, and its value is a timestamp in milliseconds since January 1, 1970 UTC.