6.3.Stream Queue

予早 2025-02-21 01:08:23
Categories: Tags:

简介

RabbitMQ Streams is a persistent replicated data structure that can complete the same tasks as queues.

Streams model an append-only log of messages that can be repeatedly read until they expire. Streams are always persistent and replicated. A more technical description of this stream behavior is “non-destructive consumer semantics”.

Stream vs Classic

https://www.rabbitmq.com/docs/streams#feature-comparison

流用例

开发信息流的初衷是为了满足现有队列类型无法提供或提供有缺陷的 4 种消息传递用例。

  1. Large fan-outs,大量扇出

目前,当用户要向多个订阅者发送同一信息时,必须为每个消费者绑定一个专用队列。如果用户数量较多,这可能会变得效率低下,尤其是在需要持久性和/或复制时。流将允许任意数量的消费者以非破坏性的方式从同一队列中消费相同的信息,从而无需绑定多个队列。流消费者还可以从副本中读取,从而将读取负载分散到整个集群中。

\2. Replay (Time-travelling),回放

由于当前所有 RabbitMQ 队列类型都具有破坏性消耗行为,即当消费者完成消耗后,队列中的消息将被删除,因此无法重新读取已消耗的消息。流将允许消费者附加到日志中的任意点并从那里读取。

\3. Throughput Performance,吞吐性能

任何持久队列类型的吞吐量都无法与现有的基于日志的消息传递系统相媲美。数据流的设计以性能为主要目标。

\4. Large backlogs,大量积压

大多数 RabbitMQ 队列都被设计为向空状态收敛,并因此进行了优化,当特定队列上有数百万条消息时,队列的性能可能会变差。流旨在以高效的方式存储大量数据,并将内存开销降至最低。

声明流队列

ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
  "my-stream",
  true,         // durable
  false, false, // not exclusive, not auto-delete
  Collections.singletonMap("x-queue-type", "stream")
);

队列参数

x-max-length-bytes,默认不设置

x-max-age,默认不设置

x-stream-max-segment-size-bytes,默认不设置,数据流在磁盘上被分割成固定大小的段文件。此设置可控制这些文件的大小。默认值:(500000000 字节)。

x-stream-filter-size-bytes,用于过滤的 Bloom 过滤器的大小。数值必须介于 16 和 255 之间。默认值:16。

Controlling the Initial Replication Factor

The x-initial-cluster-size queue argument controls how many rabbit nodes the initial stream cluster should span.

消费者参数

由于流不会删除任何信息,因此任何消费者都可以从日志的任何一点开始读取/消费。这由 x-stream-offset 消费者参数控制。如果未指定,消费者将从消费者启动后写入日志的下一个偏移开始读取。支持以下值

注:流的prefetch会改变offset

How Streams Use Resources

Streams usually will have lower CPU and memory footprint than quorum queues.

All data is stored on disk with only unwritten data stored in memory.

Single Active Consumer for Stream

When several consumer instances sharing the same stream and name enable single active consumer, only one of these instances will be active at a time and so will receive messages. The other instances will be idle.

Super Stream

https://blog.rabbitmq.com/posts/2022/07/rabbitmq-3-11-feature-preview-super-streams

超级流是由单个常规流组成的逻辑流。它是使用 RabbitMQ Streams 扩展发布和消费的一种方法:将一个大型逻辑流划分为多个分区流,在多个群集节点上分割存储和流量。

Super streams add complexity compared to individual streams, so they should not be considered the default solution for all use cases involving streams. Consider using super streams only if you are sure you reached the limits of individual streams.

创建一个有三个分区的超级流invoices

rabbitmq-streams add_super_stream invoices --partitions 3

Filtering

https://blog.rabbitmq.com/posts/2023/10/stream-filtering/

https://blog.rabbitmq.com/posts/2023/10/stream-filtering-internals/

RabbitMQ Stream 提供服务器端过滤功能,可避免读取流中的所有消息,而只在客户端进行过滤,主要目的是当消费应用程序只需要一部分信息时,节省网络带宽、

发布者设置消息headers参数x-stream-filter-value

channel.basicPublish(
  "", // default exchange
  "my-stream",
  new AMQP.BasicProperties.Builder()
    .headers(Collections.singletonMap(
      "x-stream-filter-value", "california" // set filter value
    ))
    .build(),
  body
);

消费者设置参数

channel.basicQos(100); // QoS must be specified
channel.basicConsume(
  "my-stream",
  false,
  Collections.singletonMap("x-stream-filter", "california"), // set filter
  (consumerTag, message) -> {
    Map<String, Object> headers = message.getProperties().getHeaders();
    // there must be some client-side filter logic
    if ("california".equals(headers.get("x-stream-filter-value"))) {
      // message processing
      // ...
    }
    channel.basicAck(message.getEnvelope().getDeliveryTag(), false); // ack is required
  },
  consumerTag -> { });

副本管理

Two CLI commands are provided to perform the above operations, rabbitmq-streams add_replica and rabbitmq-streams delete_replica:

rabbitmq-streams add_replica [-p <vhost>] <stream-name> <node>
rabbitmq-streams delete_replica [-p <vhost>] <stream-name> <node>

The replication status of a stream can be queried using the following command:

rabbitmq-streams stream_status [-p <vhost>] <stream-name>

In addition streams can be restarted using:

rabbitmq-streams restart_stream [-p <vhost>] <stream-name>

数据安全性

https://www.rabbitmq.com/docs/streams#data-safety

Limitations

Message Encoding

Streams internally store their messages as AMQP 1.0 encoded data. This means when publishing using AMQP 0.9.1 a conversion takes place. Although the AMQP 1.0 data model is mostly capable of containing all of AMQP 0.9.1’s data model there are some limitations. If an AMQP 0.9.1 message contains header entries with complex values such as arrays or tables these headers will not be converted. That is because headers are stored as application properties inside the AMQP 1.0 message and these can only contain values of simple types, such as strings and numbers.

UI Metric Accuracy

Management UI can show a message count that slightly exceeds the actual count in the stream. Due to the way stream storage is implemented, offset tracking information is also counted as messages, making the message count artificially larger than it is. This should make no practical difference in most systems.

可以通过 RabbitMQ 客户端库或专用二进制协议插件和相关客户端使用流中的数据。强烈建议使用后一种方式,因为它可以访问所有流专用功能,并提供最佳吞吐量(性能)。

https://www.rabbitmq.com/docs/stream

https://www.rabbitmq.com/docs/stream-core-plugin-comparison