Consumer Tag
消费者标识
Push与Pull消息
- Push:在队列上注册消费者,RabbitMQ会主动推送消息到消费者
- Pull:使用协议方法basic.get,消费者主动从队列获取消息
由于消费者通常为long lived application,在时断时续有消息的队列中Pull方式相对Push方式低效。
An attempt to consume from a non-existent queue will result in a channel-level exception with the code of 404 Not Found and render the channel it was attempted on to be closed.
Consumer Acknowledgement Mechanism
- Automatic,自动ACK,消费者接收到消息就ACK
- Manual,手动ACK,择时手动ACK
消费者确认机制有超时时间,防止消费者接收到消息宕机导致消息永远无法被处理,默认30min,见配置项consumer_timeout。
Per-node Configuration:https://www.rabbitmq.com/docs/consumers#per-node-configuration
Per-queue Configuration:https://www.rabbitmq.com/docs/consumers#per-queue-configuration
nack
https://www.rabbitmq.com/docs/nack
nack时可以选择消息是否重新入队,不重新入队则该消息被RabbitMQ删除
When a message is requeued, it will be placed to its original position in its queue, if possible. If not (due to concurrent deliveries and acknowledgements from other consumers when multiple consumers share a queue), the message will be requeued to a position closer to queue head.
消费者侧保证单个消费者消费
Exclusivity
注册消费者时设置Exclusivity,仅在该队列无其他消费者时注册成功
Prefetch
https://www.rabbitmq.com/docs/consumer-prefetch
自动ACK会使得消费者进程内存使用持续增长,因为消费者无法以最快的速度处理信息。
手动ACK可使用channel QoS (prefetch)控制该消费者未ACK的消息数量。
Consumer Priority
https://www.rabbitmq.com/docs/consumer-priority
通常消费者会以轮询地方式接收消息,消费者可以设置优先级,让高优先级消费者优先获取消息(当QoS达到限制会让次级优先级消费者获取消息),相同优先级消费者轮询接收消息。
Delivery Tag
deliverytag是消息投递序号,每个channel对应一个(long类型),从1开始到9223372036854775807范围,在手动消息确认时可以对指定deliverytag的消息进行ack、nack、reject等操作。
每次消费或者重新投递requeue后,deliverytag都会增加,理论上该正常业务范围内,该值永远不会达到最大范围上限。可以根据每个消费者对应channel的deliverytag消费速率计算到达最大值需要的时间。
假设:每秒钟一个消费者可以消费1000w个消息(假设每个消费者一个channel),则 9223372036854775807 / (60 60 24 365 1000w) = 29247年后能达到上限数值。
消费者ACK后立即关闭通道,可能会导致该ACK无法到达队列而使得该消息被投递给另外一个消费者。
If a channel is closed immediately after a consumer acknowledged a number of deliveries on it, the acknowledgements may or may not reach their target queue before the channel is terminated. In this case the messages with a pending acknowledgement on the channel will be automatically requeued following the channel closure.