AMQP 0-9-1中的消息路由是由exchange执行。
交换机类型
内置交换机类型
Direct
Direct (including the default exchange):投递消息到路由键完全匹配的下游
Fanout
Fanout:广播消息到所有下游,忽略路由键
Topic
Topic:投递消息到路由见规则匹配的下游,路由键以.分隔,如canal.sync.net_value
*(star) can substitute for exactly one word.#(hash) can substitute for zero or more words.
Headers
Headers:投递消息到消息headers中键值对匹配的下游
队列A:绑定交换机参数是:format=pdf,type=report,x-match=all,
队列B: 绑定交换机参数是:format=pdf,type=log,x-match=any,
队列C:绑定交换机参数是:format=zip,type=report,x-match=all,
消息1发送交换机的头参数是:format=pdf,type=reprot则消息传送到队列A
消息2发送交换机的头参数是:format=pdf则消息传送到队列A和队列B
消息3发送交换机的头参数是:format=zip,type=log则消息没有匹配队列,此消息会被丢弃
x-match
all: 默认值。一个传送消息的header里的键值对和交换机的header键值对全部匹配,才可以路由到对应交换机
any: 一个传送消息的header里的键值对和交换机的header键值对任意一个匹配,就可以路由到对应交换机
https://blog.csdn.net/hry2015/article/details/79188615
插件提供的交换机类型
More exchange types can be provided by plugins. Consistent hashing exchange, random routing exchange, internal event exchange and delayed message exchange are exchange plugins that ship with RabbitMQ. Like all plugins they must be enabled before they can be used.
绑定关系
交换机是一个路由表,表中每个元素是一个绑定关系。
绑定关系有两类:
- 交换机到队列
- 交换机到交换机
路由键长度限制为255字节
Exchange to Exchange Bindings
Just like with
queue.bind, multiple distinct bindings can be created between the same binding-endpoints. We detect and eliminate cycles during message delivery, and ensure that transitively, over any routing topology, for every queue to which a given message is routed, each queue will receive exactly one copy of that message. Exchanges which are declared asauto-deletewill still be removed when all their bindings are removed, regardless of whether those bindings are to queues or exchanges. Note that an auto-delete exchange will only be deleted when bindings for which the exchange is the source are removed: if you add exchange-to-exchange bindings for which the given exchange is the destination then that exchange will not be auto-deleted on removal of those bindings.
消息路由
https://www.rabbitmq.com/docs/publishers#amqp-0-9-1
- 消息由交换机最终路由到队列
- 发布消息到一个不存在的交换机,会导致channel error,chanenl会被关闭
- 发布消息到一个存在的交换机但其无法路由到队列(没有或者无法匹配绑定关系)
- 若无备用交换机
- 生产者发布消息时
mandatory为false,该消息被直接删除 - 生产者发布消息时
mandatory为true,消息被退回生产者,生产者应当设置回调函数以处理该问题(如打印错误日志或尝试其他交换机进行路由)
- 生产者发布消息时
- 若有备用交换机
- 消息被备用交换机正确路由
- 备用交换机无法路由,或者备用交换机的备用交换机等构成的备用交换机链无法路由,或消息二次到达某一交换机
- 生产者发布消息时
mandatory为false,该消息被直接删除 - 生产者发布消息时
mandatory为true,消息被退回生产者,生产者应当设置回调函数以处理该问题(如打印错误日志或尝试其他交换机进行路由)
- 生产者发布消息时
- 若无备用交换机
备用交换机
Alternate Exchanges is an AMQP 0-9-1 exchange feature that lets clients handle messages that an exchange was unable to route (i.e. either because there were no bound queues or no matching bindings). Typical examples of this are detecting when clients accidentally or maliciously publish messages that cannot be routed “or else” routing semantics where some messages are handled specially and the rest by a generic handler.
In modern RabbitMQ versions, the default exchange is a special-cased convention in the code and not a “real” exchange. Therefore it does not support the alternate exchange feature.
例如恶意数据走备用交换机
The behaviour of an AE purely pertains to routing. If a message is routed via an AE it still counts as routed for the purpose of the ‘mandatory’ flag, and the message is otherwise unchanged.
当某个交换机无法路由该消息,则会将该消息交由其备用交换机,当备用交换机无法路由该消息,则将该消息交由备用交换机的备用交换机,由此形成备用交换机链,此时消息有三种路由结果:
- 消息被正确路由
- 消息被二次路由到某一交换机,消息被删除
- 消息不可路由,消息被删除
rabbitmqctl set_policy AE "^my-direct$" '{"alternate-exchange":"my-ae"}' --apply-to exchanges
Map<String, Object> args = new HashMap<String, Object>();
args.put("alternate-exchange", "my-ae");
channel.exchangeDeclare("my-direct", "direct", false, false, args);
channel.exchangeDeclare("my-ae", "fanout");
channel.queueDeclare("routed");
channel.queueBind("routed", "my-direct", "key1");
channel.queueDeclare("unrouted");
channel.queueBind("unrouted", "my-ae", "");
死信交换机
https://www.rabbitmq.com/docs/dlx
死信交换机用于路由死信到死信队列,不设置死信路由键则使用消息原来的路由键。死信可由以下任一事件产生:
- 消费者以
requeue=false为参调用basic.reject或者basic.nack对消息negatively acknowledged - 消息TTL过期
- 由于队列长度达到限制导致该消息被从队列删除
注意:若队列过期,队列中的消息并非属于死信
# 给所有队列配置死信交换机my-dlx
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
args.put("x-dead-letter-routing-key", "some-routing-key");
channel.queueDeclare("myqueue", false, false, false, args);
It is possible to form a cycle of message dead-lettering. For instance, this can happen when a queue “dead-letters” messages to the default exchange without specifying a dead-letter routing key. Messages in such cycles (that is, messages that reach the same queue twice) are dropped if there was no rejections in the entire cycle.
Safety
By default, dead-lettered messages are re-published without publisher confirms turned on internally. Therefore using DLX in a clustered RabbitMQ environment is not guaranteed to be safe. Messages are removed from the original queue immediately after publishing to the DLX target queue. This ensures that there is no chance of excessive message build up that could exhaust broker resources. However, messages can be lost if the target queue is not available to accept messages.
Since RabbitMQ 3.10 quorum queues support at-least-once dead-lettering where messages are re-published with publisher confirms turned on internally.
消息被投递到队列然后生产者确认,随后该消息变为死信并交由死信交换机路由,若该交换机无法路由,则此时消息被直接删除,生产者对此无法感知
"""
mandatory and alternative-exchange
"""
import pika.exceptions
def callback(ch, method, properties, body):
print(ch, method, properties, body)
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='192.168.1.250',
port=5672,
virtual_host='/',
credentials=pika.PlainCredentials(username="admin", password="123456"),
connection_attempts=3,
retry_delay=1
)
)
channel = connection.channel()
channel.queue_declare(queue='ae_queue', durable=False)
channel.exchange_declare(exchange='ae', exchange_type='topic', durable=False)
# uncomment this to test alternative-exchange and mandatory
# channel.queue_bind(exchange='ae', queue='ae_queue', routing_key='ae')
# uncomment this to test circle routing
channel.exchange_bind(destination="ex", source="ae", routing_key="ae")
channel.queue_declare(queue='ex_queue', durable=False)
channel.exchange_declare(exchange='ex', exchange_type='topic', arguments={"alternate-exchange": "ae"}, durable=False)
channel.queue_bind(exchange='ex', queue='ex_queue', routing_key='ex')
try:
# basic.publish不会有任何返回
channel.basic_publish(exchange='ex', routing_key='ae', body='Hello World!'.encode('utf-8'),
properties=pika.BasicProperties(content_type='text/plain',
delivery_mode=pika.DeliveryMode.Persistent),
mandatory=True)
# 不可路由消息处理
except pika.exceptions.UnroutableError as e:
print("UnroutableError", e.messages)
# nack消息处理
except pika.exceptions.NackError as e:
print("NackError", e.messages)
connection.close()