简介
RabbitMQ Streams is a persistent replicated data structure that can complete the same tasks as queues.
Streams model an append-only log of messages that can be repeatedly read until they expire. Streams are always persistent and replicated. A more technical description of this stream behavior is “non-destructive consumer semantics”.
- 流总是持久化的
- 流总是有副本的
- 流中消息在过期之前是可被重复读取的
Stream vs Classic
https://www.rabbitmq.com/docs/streams#feature-comparison
流用例
开发信息流的初衷是为了满足现有队列类型无法提供或提供有缺陷的 4 种消息传递用例。
- Large fan-outs,大量扇出
目前,当用户要向多个订阅者发送同一信息时,必须为每个消费者绑定一个专用队列。如果用户数量较多,这可能会变得效率低下,尤其是在需要持久性和/或复制时。流将允许任意数量的消费者以非破坏性的方式从同一队列中消费相同的信息,从而无需绑定多个队列。流消费者还可以从副本中读取,从而将读取负载分散到整个集群中。
\2. Replay (Time-travelling),回放
由于当前所有 RabbitMQ 队列类型都具有破坏性消耗行为,即当消费者完成消耗后,队列中的消息将被删除,因此无法重新读取已消耗的消息。流将允许消费者附加到日志中的任意点并从那里读取。
\3. Throughput Performance,吞吐性能
任何持久队列类型的吞吐量都无法与现有的基于日志的消息传递系统相媲美。数据流的设计以性能为主要目标。
\4. Large backlogs,大量积压
大多数 RabbitMQ 队列都被设计为向空状态收敛,并因此进行了优化,当特定队列上有数百万条消息时,队列的性能可能会变差。流旨在以高效的方式存储大量数据,并将内存开销降至最低。
声明流队列
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
  "my-stream",
  true,         // durable
  false, false, // not exclusive, not auto-delete
  Collections.singletonMap("x-queue-type", "stream")
);
队列参数
x-max-length-bytes,默认不设置
x-max-age,默认不设置
x-stream-max-segment-size-bytes,默认不设置,数据流在磁盘上被分割成固定大小的段文件。此设置可控制这些文件的大小。默认值:(500000000 字节)。
x-stream-filter-size-bytes,用于过滤的 Bloom 过滤器的大小。数值必须介于 16 和 255 之间。默认值:16。
- 数据流是作为不可变的只可追加的磁盘日志实现的。这意味着日志将无限增长,直到磁盘耗尽。使用x-max-length-bytes和x-max-age避免。
Controlling the Initial Replication Factor
The x-initial-cluster-size queue argument controls how many rabbit nodes the initial stream cluster should span.
消费者参数
由于流不会删除任何信息,因此任何消费者都可以从日志的任何一点开始读取/消费。这由 x-stream-offset 消费者参数控制。如果未指定,消费者将从消费者启动后写入日志的下一个偏移开始读取。支持以下值
- first - 从日志中第一条可用信息开始读取
- last - 从最后写入的信息 “块 “开始读取(信息 “块 “是流中使用的存储和传输单位,简单地说,它是由几条到几千条信息组成的一批信息,具体取决于入口)。
- next - 与不指定任何偏移量相同
- 偏移量(Offset)–一个数值,用于指定附加到日志的精确偏移量。如果该偏移不存在,它将分别箝位到日志的起点或终点。
- 时间戳 - 时间戳值,指定要附加到日志的时间点。它将箝位到最接近的偏移量,如果时间戳超出了流的范围,它将分别箝位到日志的起点或终点。在 AMQP 0.9.1 中,使用的时间戳是 POSIX 时间,精确度为一秒,即自 1970-01-01 00:00:00 UTC 起的秒数。请注意,消费者可能会收到比指定时间戳早一点发布的信息。
- Interval - 字符串值,指定附加日志的时间间隔(相对于当前时间)。使用与 x-max-age 相同的规范(请参阅 “保留”)。
注:流的prefetch会改变offset
How Streams Use Resources
Streams usually will have lower CPU and memory footprint than quorum queues.
All data is stored on disk with only unwritten data stored in memory.
Single Active Consumer for Stream
When several consumer instances sharing the same stream and name enable single active consumer, only one of these instances will be active at a time and so will receive messages. The other instances will be idle.
- Messages are processed in order: there is only one consumer at a time.
- Consumption continuity is maintained: a consumer from the group will take over if the active one stops or crashes.
Super Stream
https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams
超级流是由单个常规流组成的逻辑流。它是使用 RabbitMQ Streams 扩展发布和消费的一种方法:将一个大型逻辑流划分为多个分区流,在多个群集节点上分割存储和流量。
Super streams add complexity compared to individual streams, so they should not be considered the default solution for all use cases involving streams. Consider using super streams only if you are sure you reached the limits of individual streams.
创建一个有三个分区的超级流invoices
rabbitmq-streams add_super_stream invoices --partitions 3
Filtering
https://blog.rabbitmq.com/posts/2023/10/stream-filtering/
https://blog.rabbitmq.com/posts/2023/10/stream-filtering-internals/
RabbitMQ Stream 提供服务器端过滤功能,可避免读取流中的所有消息,而只在客户端进行过滤,主要目的是当消费应用程序只需要一部分信息时,节省网络带宽、
- 可以在同一数据流中发布带过滤值和不带过滤值的信息。
- 当消费者设置过滤器时,不会发送没有过滤值的报文。如果将 x-stream-match-unfiltered 参数设置为 true,就能改变这种行为并接收未经过滤的信息。
- x-stream-filter 消费者参数接受一个字符串,也接受一个字符串数组,用于接收不同过滤值的信息。
- 服务端侧过滤使用布隆过滤器,无法完全过滤不匹配消息,故消费者必须有过滤逻辑作为兜底以确保不会消费到不期望的消息。
发布者设置消息headers参数x-stream-filter-value
channel.basicPublish(
  "", // default exchange
  "my-stream",
  new AMQP.BasicProperties.Builder()
    .headers(Collections.singletonMap(
      "x-stream-filter-value", "california" // set filter value
    ))
    .build(),
  body
);
消费者设置参数
channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-filter", "california"), // set filter
  (consumerTag, message) -> {
    Map<String, Object> headers = message.getProperties().getHeaders();
    // there must be some client-side filter logic
    if ("california".equals(headers.get("x-stream-filter-value"))) {
      // message processing
      // ...
    }
    channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });
副本管理
Two CLI commands are provided to perform the above operations, rabbitmq-streams add_replica and rabbitmq-streams delete_replica:
rabbitmq-streams add_replica [-p <vhost>] <stream-name> <node>
rabbitmq-streams delete_replica [-p <vhost>] <stream-name> <node>
The replication status of a stream can be queried using the following command:
rabbitmq-streams stream_status [-p <vhost>] <stream-name>
In addition streams can be restarted using:
rabbitmq-streams restart_stream [-p <vhost>] <stream-name>
数据安全性
https://www.rabbitmq.com/docs/streams#data-safety
Limitations
Message Encoding
Streams internally store their messages as AMQP 1.0 encoded data. This means when publishing using AMQP 0.9.1 a conversion takes place. Although the AMQP 1.0 data model is mostly capable of containing all of AMQP 0.9.1’s data model there are some limitations. If an AMQP 0.9.1 message contains header entries with complex values such as arrays or tables these headers will not be converted. That is because headers are stored as application properties inside the AMQP 1.0 message and these can only contain values of simple types, such as strings and numbers.
UI Metric Accuracy
Management UI can show a message count that slightly exceeds the actual count in the stream. Due to the way stream storage is implemented, offset tracking information is also counted as messages, making the message count artificially larger than it is. This should make no practical difference in most systems.
可以通过 RabbitMQ 客户端库或专用二进制协议插件和相关客户端使用流中的数据。强烈建议使用后一种方式,因为它可以访问所有流专用功能,并提供最佳吞吐量(性能)。