Kafka 101

Database and Ruby, Python, History


现在 Kafka 不需要 Zookeeper 了,直接容器拉起一个 Kafka.

version: "3.8"

services:
  kafka:
    image: confluentinc/cp-kafka:7.6.1
    container_name: kafka
    ports:
      - "9094:9094" # EXTERNAL (SASL_PLAINTEXT)
    environment:
      # KRaft 基本配置
      CLUSTER_ID: "MkU3OEVBNTcwNTJFNTM2Qk"
      KAFKA_NODE_ID: "1"
      KAFKA_PROCESS_ROLES: "broker,controller"
      KAFKA_LISTENERS: "PLAINTEXT://0.0.0.0:9092,EXTERNAL://0.0.0.0:9094,CONTROLLER://0.0.0.0:9093"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
      KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:9093"
      KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"

      # EXTERNAL 监听器开启 SASL/PLAIN
      KAFKA_SASL_ENABLED_MECHANISMS: "PLAIN"
      KAFKA_LISTENER_NAME_EXTERNAL_SASL_ENABLED_MECHANISMS: "PLAIN"
      KAFKA_LISTENER_NAME_EXTERNAL_PLAIN_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_local="local-secret";'

      # 单机合理默认
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: "1"
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"

list Topic

kafka-topics --list --bootstrap-server localhost:9092

create topic

kafka-topics --create --topic test-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

describe topic

kafka-topics --describe --topic test-topic --bootstrap-server localhost:9092

delete topic

kafka-topics --delete --topic test-topic --bootstrap-server localhost:9092

write message

kafka-console-producer --topic test-topic --bootstrap-server localhost:9092 --property parse.key=true --property key.separator=:
>user_id: 1, name: aaa
>user_id: 2, name: bbb
>user_id: 3, name: ccc
ctrl+D

list consumer

kafka-consumer-groups --bootstrap-server localhost:9092 --list

consume messages

kafka-console-consumer --bootstrap-server localhost:9092 --topic test-topic --group test-consumer-group --from-beginning
1, name: aaa
1, name: aaa
2, name: bbb
3, name: ccc
^CProcessed a total of 4 messages

check offset for consumer

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group test-consumer-group

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
test-consumer-group test-topic      0          4               4               0               console-consumer-d8b07694-3c2b-4b35-8d91-26a773675c3a /192.168.163.2  console-consumer

再次塞入 3 条数据之后

kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group test-consumer-group

Consumer group 'test-consumer-group' has no active members.

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test-consumer-group test-topic      0          7               10              3               -               -               -

写一段脚本来模拟 Kafka 提交中间的 message 会导致偏移量更新。

from kafka import KafkaProducer, KafkaConsumer
import json
import time

# Kafka配置
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9094'
TOPIC_NAME = 'example-topic'
GROUP_ID = 'example-group'

# SASL/PLAIN认证配置
SASL_MECHANISM = 'PLAIN'
SASL_USERNAME = 'admin'
SASL_PASSWORD = 'admin-secret'

def produce_messages():
    """生产3条消息到Kafka"""
    # 创建生产者
    producer = KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        security_protocol='SASL_PLAINTEXT',
        sasl_mechanism=SASL_MECHANISM,
        sasl_plain_username=SASL_USERNAME,
        sasl_plain_password=SASL_PASSWORD
    )

    # 发送3条消息
    for i in range(1, 10):
        message = {'id': i, 'message': f'Message {i}'}
        producer.send(TOPIC_NAME, message)
        print(f"Produced: {message}")

    # 确保所有消息都被发送
    producer.flush()
    producer.close()

def consume_messages():
    """消费消息并演示不同的提交策略"""
    # 创建消费者,禁用自动提交
    consumer = KafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id=GROUP_ID,
        auto_offset_reset='earliest',
        enable_auto_commit=False,  # 禁用自动提交
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        security_protocol='SASL_PLAINTEXT',
        sasl_mechanism=SASL_MECHANISM,
        sasl_plain_username=SASL_USERNAME,
        sasl_plain_password=SASL_PASSWORD
    )

    # 处理消息
    messages_processed = 0

    # 拉取消息并处理
    for message in consumer:
        messages_processed += 1
        print(f"\nProcessing message {messages_processed}:")
        print(f"Topic: {message.topic}")
        print(f"Partition: {message.partition}")
        print(f"Offset: {message.offset}")
        print(f"Key: {message.key}")
        print(f"Value: {message.value}")

        # 模拟处理消息
        print(f"Processing message: {message.value}")
        time.sleep(1)  # 模拟处理时间

        # 只在处理第3条消息时提交offset
        if messages_processed == 3:
            print("Committing offset after processing the third message")
            consumer.commit()
            print("Offset committed successfully")
        else:
            print(f"Not committing offset for message {messages_processed}")

        # 处理完3条消息后退出
        if messages_processed >= 3:
            break

    consumer.close()

def demonstrate_offset_behavior():
    """演示消费者重启后的行为"""
    print("\n--- Demonstrating consumer restart behavior ---")
    print("Creating a new consumer with the same group ID...")

    # 创建一个新的消费者,使用相同的group ID
    consumer = KafkaConsumer(
        TOPIC_NAME,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id=GROUP_ID,
        auto_offset_reset='earliest',
        enable_auto_commit=False,
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),
        security_protocol='SASL_PLAINTEXT',
        sasl_mechanism=SASL_MECHANISM,
        sasl_plain_username=SASL_USERNAME,
        sasl_plain_password=SASL_PASSWORD
    )

    print("New consumer created. Polling for messages...")

    # 拉取消息以查看从哪个offset开始消费
    messages = consumer.poll(timeout_ms=5000, max_records=10)

    if not messages:
        print("No messages received. All messages were committed or topic is empty.")
    else:
        print("\nMessages received after restart:")
        for tp, msgs in messages.items():
            for message in msgs:
                print(f"Topic: {message.topic}")
                print(f"Partition: {message.partition}")
                print(f"Offset: {message.offset}")
                print(f"Value: {message.value}")

    consumer.close()

if __name__ == "__main__":
    # 1. 生产3条消息
    print("=== Producing 3 messages ===")
    produce_messages()
    time.sleep(1)  # 等待消息完全写入

    # 2. 消费消息,只在处理第3条消息后提交offset
    print("\n=== Consuming messages ===")
    consume_messages()

    # 3. 演示重启消费者后的行为
    time.sleep(1)
    demonstrate_offset_behavior()

MQ 还是 Kafka

Kafka 的本质还是流,即你从 Kafka 里面处理消息,要求尽量快和稳定,而不要复杂的业务逻辑。否则建议使用 MQ。

如果坚持使用 Kafka,可以考虑重试和死信队列,但我总觉得明明应该用 MQ 的,非要用 Kafka,进而又缝缝补补又三年。