| 维度 | aiokafka | pykafka | kafka-python | confluent-kafka-python |
|---|---|---|---|---|
| 协程支持 | ✅ 原生 asyncioawait producer.send_and_wait() |
❌ 无(仅线程/queue) | ❌ 无(仅 Future+回调) | ✅ 官方 asyncio 封装confluent_kafka.aio |
| 同步 API | ❌ 仅协程 | ✅ 有 | ✅ 有 | ✅ 有 |
| 实现语言/依赖 | 纯 Python; 可选 C 扩展加速 CRC |
纯 Python; 可选 librdkafka 扩展 |
纯 Python; 可选 crc32c 加速 |
C 扩展(librdkafka) 自动 binary wheel |
| Kafka 协议版本 | 0.9–3.6+(CI 测到 3.6) | 0.8.2+(文档自述) | 0.8.0–3.6+(master 测到 3.6) | 0.8–3.6+(librdkafka 覆盖) |
| Producer 吞吐量 单节点 1 kB msg |
≈70 k msg/s (asyncio 单线程) |
≈46 k msg/s (含 C 扩展) |
≈28 k msg/s (纯 Python) |
≈120 k msg/s (librdkafka) |
| Consumer 吞吐量 | ≈50 k msg/s | ≈14 k msg/s | ≈37 k msg/s | ≈110 k msg/s |
| Exactly-Once / 事务 | ✅ 支持(0.10+) | ❌ 不支持 | ✅ 支持(0.11+) | ✅ 支持(librdkafka 1.x+) |
| GZIP/Snappy/LZ4/Zstd | 全部支持 | 全部支持 | 全部支持 | 全部支持 |
| 当前主分支 | 0.12.x(PyPI 同步) | 2.8.2(2018-12 后无实质更新) | 2.1.x(2024-12 发版) | 2.6.2(2025-09 发版,2.7 在途) |
| GitHub Star/贡献 | 2.4 k / 190+ 贡献者 | 1.3 k / 50 贡献者 | 6.2 k / 260+ 贡献者 | 359 / 180+ 贡献者 |
| 最近 1 年 commit | ~70(main 分支) | ≈0(已事实停滞) | ~70(main 分支) | ~120(main 分支) |
| Issue 关闭速度 | 中位 10 天 | 基本不处理 | 中位 20 天 | 中位 5 天(Confluent 商业支持) |
| 官方归属 | aio-libs 社区 | Parse.ly 后弃坑 | 社区(dpkp) | Confluent(非 Apache) |
| 安装难易 | pip install aiokafka[snappy] |
需手动装 librdkafka 头文件 | 纯 pip | pip install confluent-kafka(wheel 自带 librdkafka) |
| 典型场景 | FastAPI / 异步栈首选 | 老集群 0.8.x 过渡 | 脚本/同步代码/无编译依赖 | 生产级、极限吞吐、EOS |
https://github.com/dpkp/kafka-python
https://github.com/aio-libs/aiokafka
https://github.com/confluentinc/confluent-kafka-python
https://github.com/Parsely/pykafka
https://github.com/ag2ai/faststream
aiokafka
pip install aiokafka
import asyncio
import logging
from aiokafka import AIOKafkaProducer
logging.basicConfig(level=logging.INFO)
async def producer_example():
producer = AIOKafkaProducer(bootstrap_servers="110.42.60.147:9092")
try:
await producer.start()
logging.info("Producer connected to Kafka.")
while True:
topic, msg = "topic-1", "message-1"
await producer.send(topic, msg.encode())
logging.info(f"Message sent to topic '{topic}'.")
topic, msg = "topic-2", "message-2"
await producer.send(topic, msg.encode())
logging.info(f"Message sent to topic '{topic}'.")
except Exception as e:
logging.error(f"Error occurred: {e}")
finally:
await producer.stop()
logging.info("Producer disconnected from Kafka.")
asyncio.run(producer_example())
import asyncio
import logging
from aiokafka import AIOKafkaConsumer, ConsumerRecord
logging.basicConfig(level=logging.INFO)
async def consumer_example():
consumer = AIOKafkaConsumer(
"topic-1", "topic-2",
bootstrap_servers="110.42.60.147:9092",
group_id="test-group"
)
try:
await consumer.start()
logging.info("Consumer connected to Kafka.")
async for msg in consumer:
msg: ConsumerRecord
print(msg.topic)
logging.info(f"Consumed message: {msg.value.decode()}")
except Exception as e:
logging.error(f"Error occurred: {e}")
finally:
await consumer.stop()
logging.info("Consumer disconnected from Kafka.")
asyncio.run(consumer_example())