现在 Kafka 不需要 Zookeeper 了,直接容器拉起一个 Kafka.
version: "3.8"
services:
kafka:
image: confluentinc/cp-kafka:7.6.1
container_name: kafka
ports:
- "9094:9094" # EXTERNAL (SASL_PLAINTEXT)
environment:
# KRaft 基本配置
CLUSTER_ID: "MkU3OEVBNTcwNTJFNTM2Qk"
KAFKA_NODE_ID: "1"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,EXTERNAL://0.0.0.0:9094,CONTROLLER://0.0.0.0:9093"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
# EXTERNAL 监听器开启 SASL/PLAIN
KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN"
KAFKA_LISTENER_NAME_EXTERNAL_SASL_ENABLED_MECHANISMS: "PLAIN"
KAFKA_LISTENER_NAME_EXTERNAL_PLAIN_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_local="local-secret";'
# 单机合理默认
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
list Topic
kafka-topics --list --bootstrap-server localhost:9092
create topic
kafka-topics --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
describe topic
kafka-topics --describe --topic test-topic --bootstrap-server localhost:9092
delete topic
kafka-topics --delete --topic test-topic --bootstrap-server localhost:9092
write message
kafka-console-producer --topic test-topic --bootstrap-server localhost:9092 --property parse.key=true --property key.separator=:
>user_id: 1, name: aaa
>user_id: 2, name: bbb
>user_id: 3, name: ccc
ctrl+D
list consumer
kafka-consumer-groups --bootstrap-server localhost:9092 --list
consume messages
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --group test-consumer-group --from-beginning
1, name: aaa
1, name: aaa
2, name: bbb
3, name: ccc
^CProcessed a total of 4 messages
check offset for consumer
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group test-consumer-group
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-consumer-group test-topic 0 4 4 0 console-consumer-d8b07694-3c2b-4b35-8d91-26a773675c3a /192.168.163.2 console-consumer
再次塞入 3 条数据之后
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group test-consumer-group
Consumer group 'test-consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-consumer-group test-topic 0 7 10 3 - - -
写一段脚本来模拟 Kafka 提交中间的 message 会导致偏移量更新。
from kafka import KafkaProducer, KafkaConsumer
import json
import time
# Kafka配置
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9094'
TOPIC_NAME = 'example-topic'
GROUP_ID = 'example-group'
# SASL/PLAIN认证配置
SASL_MECHANISM = 'PLAIN'
SASL_USERNAME = 'admin'
SASL_PASSWORD = 'admin-secret'
def produce_messages():
"""生产3条消息到Kafka"""
# 创建生产者
producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
security_protocol='SASL_PLAINTEXT',
sasl_mechanism=SASL_MECHANISM,
sasl_plain_username=SASL_USERNAME,
sasl_plain_password=SASL_PASSWORD
)
# 发送3条消息
for i in range(1, 10):
message = {'id': i, 'message': f'Message {i}'}
producer.send(TOPIC_NAME, message)
print(f"Produced: {message}")
# 确保所有消息都被发送
producer.flush()
producer.close()
def consume_messages():
"""消费消息并演示不同的提交策略"""
# 创建消费者,禁用自动提交
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=GROUP_ID,
auto_offset_reset='earliest',
enable_auto_commit=False, # 禁用自动提交
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
security_protocol='SASL_PLAINTEXT',
sasl_mechanism=SASL_MECHANISM,
sasl_plain_username=SASL_USERNAME,
sasl_plain_password=SASL_PASSWORD
)
# 处理消息
messages_processed = 0
# 拉取消息并处理
for message in consumer:
messages_processed += 1
print(f"\nProcessing message {messages_processed}:")
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
print(f"Key: {message.key}")
print(f"Value: {message.value}")
# 模拟处理消息
print(f"Processing message: {message.value}")
time.sleep(1) # 模拟处理时间
# 只在处理第3条消息时提交offset
if messages_processed == 3:
print("Committing offset after processing the third message")
consumer.commit()
print("Offset committed successfully")
else:
print(f"Not committing offset for message {messages_processed}")
# 处理完3条消息后退出
if messages_processed >= 3:
break
consumer.close()
def demonstrate_offset_behavior():
"""演示消费者重启后的行为"""
print("\n--- Demonstrating consumer restart behavior ---")
print("Creating a new consumer with the same group ID...")
# 创建一个新的消费者,使用相同的group ID
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=GROUP_ID,
auto_offset_reset='earliest',
enable_auto_commit=False,
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
security_protocol='SASL_PLAINTEXT',
sasl_mechanism=SASL_MECHANISM,
sasl_plain_username=SASL_USERNAME,
sasl_plain_password=SASL_PASSWORD
)
print("New consumer created. Polling for messages...")
# 拉取消息以查看从哪个offset开始消费
messages = consumer.poll(timeout_ms=5000, max_records=10)
if not messages:
print("No messages received. All messages were committed or topic is empty.")
else:
print("\nMessages received after restart:")
for tp, msgs in messages.items():
for message in msgs:
print(f"Topic: {message.topic}")
print(f"Partition: {message.partition}")
print(f"Offset: {message.offset}")
print(f"Value: {message.value}")
consumer.close()
if __name__ == "__main__":
# 1. 生产3条消息
print("=== Producing 3 messages ===")
produce_messages()
time.sleep(1) # 等待消息完全写入
# 2. 消费消息,只在处理第3条消息后提交offset
print("\n=== Consuming messages ===")
consume_messages()
# 3. 演示重启消费者后的行为
time.sleep(1)
demonstrate_offset_behavior()
MQ 还是 Kafka
Kafka 的本质还是流,即你从 Kafka 里面处理消息,要求尽量快和稳定,而不要复杂的业务逻辑。否则建议使用 MQ。
如果坚持使用 Kafka,可以考虑重试和死信队列,但我总觉得明明应该用 MQ 的,非要用 Kafka,进而又缝缝补补又三年。