A queue in RabbitMQ is an ordered collection of messages. Messages are enqueued and dequeued (delivered to consumers) in a (FIFO (“first in, first out”) manner.
队列名称
队列名称在声明队列时指定,最多255字节UTF-8编码方式字符,amq.开头的队列名称由RabbitMQ保留,声明名称符合该规则的队列会造成channel error 403。
AMQP 0-9-1中可以声明空字符串名称队列,表示交由RabbitMQ生成队列名称,称之为Server-named Queue,通常临时队列使用服务器命名队列。
队列声明
Before a queue can be used it has to be declared. Declaring a queue will cause it to be created if it does not already exist. The declaration will have no effect if the queue does already exist and its attributes are the same as those in the declaration. When the existing queue attributes are not the same as those in the declaration a channel-level exception with code 406 (PRECONDITION_FAILED) will be raised.
在 RabbitMQ 中,队列的 passsive 字段通常用于执行一个 passively 的声明。passive 参数在声明队列时可选,如果设置为 true,则表示要 passively 声明队列。Passive 声明意味着你只是想确认队列是否存在,而不是在不存在时创建它。
如果队列不存在并且 passive 设置为 true,则会返回一个通道级别的异常(Channel-level exception),并且不会创建新队列。如果队列存在,操作会成功,而不会对队列进行任何修改。
这对于检查应用程序中某个队列是否已存在是非常有用的,因为你可以在声明队列之前进行检查,而不会影响队列的状态。
队列属性
- Name
- Durable (the queue will survive a broker restart)
- Exclusive (used by only one connection and the queue will be deleted when that connection closes)
- Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes)
- Arguments (optional; used by plugins and broker-specific features such as message TTL, queue length limit, etc)
队列类型
- classic
- quorum
- stream
信息以FIFO方式入队和出队(传递到消费者)
队列长度
https://www.rabbitmq.com/docs/maxlength
You can set a maximum length on a queue. The maximum length limit can be set to a number of messages, or set to a number of bytes (the total of all message body lengths, ignoring message properties and any overheads), or both.
Queue Overflow Behaviour
The default behaviour for RabbitMQ when a maximum queue length or size is set and the maximum is reached is to drop or dead-letter messages from the front of the queue (i.e. the oldest messages in the queue). To modify this behaviour, use the overflow setting described below.
Use the overflow setting to configure queue overflow behaviour. If overflow is set to reject-publish or reject-publish-dlx, the most recently published messages will be discarded. In addition, if publisher confirms are enabled, the publisher will be informed of the reject via a basic.nack message. If a message is routed to multiple queues and rejected by at least one of them, the channel will inform the publisher via basic.nack. The message will still be published to all other queues which can enqueue it. The difference between reject-publish and reject-publish-dlx is that reject-publish-dlx also dead-letters rejected messages.
配置文件指定
x-max-length 队列消息长度
x-max-length-bytes 队列字节长度
x-overflow 溢出行为
drop-head (default), reject-publish or reject-publish-dlx
消息长度和字节长度都设置则先达的限制会被执行
使用策略指定
rabbitmqctl set_policy my-pol "^one-meg$" '{"max-length-bytes":1048576}' --apply-to queues
rabbitmqctl set_policy my-pol "^two-messages$" '{"max-length":2,"overflow":"reject-publish"}' --apply-to queues
Durability
队列可以是持久的或者临时的。
持久队列元数据存储在磁盘中,节点启动时队列会被恢复,持久队列中的持久消息也被恢复而临时消息被删除。
临时队列元数据尽可能存储在内存中,节点启动时队列被删除,临时队列中的持久消息和临时消息都会被删除。
How to choose
In most other cases, durable queues are the recommended option. For replicated queues, the only reasonable option is to use durable queues.
Throughput and latency of a queue is not affected by whether a queue is durable or not in most cases. Only environments with very high queue or binding churn — that is, where queues are deleted and re-declared hundreds or more times a second — will see latency improvements for some operations, namely on bindings. The choice between durable and transient queues therefore comes down to the semantics of the use case.
Temporary queues can be a reasonable choice for workloads with transient clients, for example, temporary WebSocket connections in user interfaces, mobile applications and devices that are expected to go offline or use switch identities. Such clients usually have inherently transient state that should be replaced when the client reconnects.
Some queue types do not support transient queues. Quorum queues must be durable due to the assumptions and requirements of the underlying replication protocol, for example.
Exclusive and Auto-delete
Exclusive (used by only one connection and the queue will be deleted when that connection closes)
Auto-delete (queue that has had at least one consumer is deleted when last consumer unsubscribes)
若从其他连接访问独占队列,会造成channel error RESOURCE_LOCKED
An exclusive queue can only be used (consumed from, purged, deleted, etc) by its declaring connection. An attempt to use an exclusive queue from a different connection will result in a channel-level exception
RESOURCE_LOCKEDwith an error message that sayscannot obtain exclusive access to locked queue.Exclusive queues are deleted when their declaring connection is closed or gone (e.g. due to underlying TCP connection loss). They therefore are only suitable for client-specific transient state.
It is common to make exclusive queues server-named.
Exclusive queues are declared on the “client-local” node (the node that the client declaring the queue is connected to), regardless of the
queue_leader_locatorvalue.
Plain Text result = channel.queue_declare(queue='', exclusive=True)
- 队列名称为空字符串表示使用随机名队列
- exclusive参数设置是否排他,排他队列属于声明该队列的连接的资源,该连接中所有通道均可以访问,其他连接的通道访问会报资源锁定错误,当连接断开时排他队列被删除(不是通道断开),无视队列durable参数
优先级
https://www.rabbitmq.com/docs/priority
RabbitMQ supports adding “priorities” to classic queues. Classic queues with the “priority” feature turned on are commonly referred to as “priority queues”. Priorities between 1 and 255 are supported, however, values between 1 and 5 are highly recommended. It is important to know that higher priority values require more CPU and memory resources, since RabbitMQ needs to internally maintain a sub-queue for each priority from 1, up to the maximum value configured for a given queue.
优先级仅支持参数设置
Channel ch = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
ch.queueDeclare("my-priority-queue", true, false, false, args);
- There is some in-memory and on-disk cost per priority level per queue. There is also an additional CPU cost, especially when consuming, so you may not wish to create huge numbers of levels.
- The message
priorityfield is defined as an unsigned byte, so in practice priorities should be between 0 and 255. - Messages without a
priorityproperty are treated as if their priority were 0. Messages with a priority which is higher than the queue’s maximum are treated as if they were published with the maximum priority.
If priority queues are what you want, this information previously stated values between 1 and 5 are highly recommended. If you must go higher than 5, values between 1 and 10 are sufficient (keep it to a single digit number) because currently using more priorities consumes more CPU resources by using more Erlang processes. Runtime scheduling would also be affected.
注意事项
Messages which should expire still only expire from the head of the queue. This means that unlike with normal queues, even per-queue TTL can lead to expired lower-priority messages getting stuck behind non-expired higher priority ones. These messages will never be delivered, but they will appear in queue statistics.
Queues which have a max-length set drop messages as usual from the head of the queue to enforce the limit. This means that higher priority messages might be dropped to make way for lower priority ones, which might not be what you would expect.
Why Policy Definition is not Supported for Priority Queues
The most convenient way to define optional arguments for a queue is using policies. Policies are the recommended way to configure TTL, queue length limits, and other optional queue arguments.
However, policies cannot be used to configure priorities because policies are dynamic and can be changed after a queue has been declared. Priority queues can never change the number of priorities they support after queue declaration, so policies would not be a safe option to use.
Queue Parallelism Considerations
A single RabbitMQ queue is bounded to a single core. Use more than one queue to improve CPU utilisation on the nodes. Plugins such as sharding and consistent hash exchange can be helpful in increasing parallelism.
Currently a single queue replica (whether leader or follower) is limited to a single CPU core on its hot code path. This design therefore assumes that most systems use multiple queues in practice. A single queue is generally considered to be an anti-pattern (and not just for resource utilisation reasons).
In case when it is desirable to trade off message ordering for parallelism (better CPU core utilisation), rabbitmq-sharding provides an opinionated way of doing so transparently to the clients.
队列侧保证单个消费者消费
Single Active Consumer
设置x-single-active-consumer为true可以保证在多个消费者注册该队列时仅有一个可从队列接收到消息,其他消费者作为当前活跃消费者的故障转移备用消费者
Message Ordering in RabbitMQ
RabbitMQ中的队列遵循先进先出。FIFO ordering is not guaranteed for priority and sharded queues.
应用程序可以假定,单个通道发布消息会按照发布顺序入队,多个连接或者多个通道发布消息会并发入队。
以下因素影响消息消费顺序:
- the presence of multiple competing consumers
- consumer priorities
- message redeliveries
In case of multiple consumers, messages will be dequeued for delivery in the FIFO order but actual delivery will happen to multiple consumers. If all of the consumers have equal priorities, they will be picked on a round-robin basis. Only consumers on channels that have not exceeded their prefetch value (the number of outstanding unacknowledged deliveries) will be considered.