client

予早 2026-04-30 23:53:28
Categories: Tags:
维度 aiokafka pykafka kafka-python confluent-kafka-python
协程支持 ✅ 原生 asyncio
await producer.send_and_wait()
❌ 无(仅线程/queue) ❌ 无(仅 Future+回调) ✅ 官方 asyncio 封装
confluent_kafka.aio
同步 API ❌ 仅协程 ✅ 有 ✅ 有 ✅ 有
实现语言/依赖 纯 Python;
可选 C 扩展加速 CRC
纯 Python;
可选 librdkafka 扩展
纯 Python;
可选 crc32c 加速
C 扩展(librdkafka)
自动 binary wheel
Kafka 协议版本 0.9–3.6+(CI 测到 3.6) 0.8.2+(文档自述) 0.8.0–3.6+(master 测到 3.6) 0.8–3.6+(librdkafka 覆盖)
Producer 吞吐量
单节点 1 kB msg
≈70 k msg/s
(asyncio 单线程)
≈46 k msg/s
(含 C 扩展)
≈28 k msg/s
(纯 Python)
≈120 k msg/s
(librdkafka)
Consumer 吞吐量 ≈50 k msg/s ≈14 k msg/s ≈37 k msg/s ≈110 k msg/s
Exactly-Once / 事务 ✅ 支持(0.10+) ❌ 不支持 ✅ 支持(0.11+) ✅ 支持(librdkafka 1.x+)
GZIP/Snappy/LZ4/Zstd 全部支持 全部支持 全部支持 全部支持
当前主分支 0.12.x(PyPI 同步) 2.8.2(2018-12 后无实质更新) 2.1.x(2024-12 发版) 2.6.2(2025-09 发版,2.7 在途)
GitHub Star/贡献 2.4 k / 190+ 贡献者 1.3 k / 50 贡献者 6.2 k / 260+ 贡献者 359 / 180+ 贡献者
最近 1 年 commit ~70(main 分支) ≈0(已事实停滞) ~70(main 分支) ~120(main 分支)
Issue 关闭速度 中位 10 天 基本不处理 中位 20 天 中位 5 天(Confluent 商业支持)
官方归属 aio-libs 社区 Parse.ly 后弃坑 社区(dpkp) Confluent(非 Apache)
安装难易 pip install aiokafka[snappy] 需手动装 librdkafka 头文件 纯 pip pip install confluent-kafka(wheel 自带 librdkafka)
典型场景 FastAPI / 异步栈首选 老集群 0.8.x 过渡 脚本/同步代码/无编译依赖 生产级、极限吞吐、EOS

https://github.com/dpkp/kafka-python

https://github.com/aio-libs/aiokafka

https://github.com/confluentinc/confluent-kafka-python

https://github.com/Parsely/pykafka

https://github.com/ag2ai/faststream

aiokafka

pip install aiokafka
import asyncio
import logging

from aiokafka import AIOKafkaProducer

logging.basicConfig(level=logging.INFO)


async def producer_example():
    producer = AIOKafkaProducer(bootstrap_servers="110.42.60.147:9092")
    try:
        await producer.start()
        logging.info("Producer connected to Kafka.")

        while True:
            topic, msg = "topic-1", "message-1"
            await producer.send(topic, msg.encode())
            logging.info(f"Message sent to topic '{topic}'.")
            topic, msg = "topic-2", "message-2"
            await producer.send(topic, msg.encode())
            logging.info(f"Message sent to topic '{topic}'.")
    except Exception as e:
        logging.error(f"Error occurred: {e}")
    finally:
        await producer.stop()
        logging.info("Producer disconnected from Kafka.")


asyncio.run(producer_example())
import asyncio
import logging

from aiokafka import AIOKafkaConsumer, ConsumerRecord

logging.basicConfig(level=logging.INFO)


async def consumer_example():
    consumer = AIOKafkaConsumer(
        "topic-1", "topic-2",
        bootstrap_servers="110.42.60.147:9092",
        group_id="test-group"
    )
    try:
        await consumer.start()
        logging.info("Consumer connected to Kafka.")
        async for msg in consumer:
            msg: ConsumerRecord
            print(msg.topic)
            logging.info(f"Consumed message: {msg.value.decode()}")
    except Exception as e:
        logging.error(f"Error occurred: {e}")
    finally:
        await consumer.stop()
        logging.info("Consumer disconnected from Kafka.")


asyncio.run(consumer_example())