boxmoe_header_banner_img

Hello! 欢迎来到悠悠畅享网!

文章导读

解决 Docker Compose 中 Kafka 消息发送失败问题


avatar
作者 2025年9月8日 11

解决 Docker Compose 中 Kafka 消息发送失败问题

本文旨在解决在使用 docker Compose 部署 kafka 集群时,应用程序无法向 Kafka 主题发送消息的问题。我们将分析常见的配置错误,并提供修改建议,确保生产者能够正确连接到 Kafka Broker,从而成功发送消息。通过调整 Kafka 的监听器配置以及生产者端的 Broker 地址,可以有效解决此类连接问题。

问题分析

在使用 Docker Compose 部署 Kafka 集群时,生产者无法发送消息到 Kafka 主题,并出现类似 Topic general-events not present in metadata after 60000 ms 的错误,通常是由于以下原因导致:

  1. Kafka Broker 地址配置错误: 生产者配置的 Broker 地址与 Kafka 实际监听的地址不匹配。
  2. 网络配置问题: 容器之间的网络连接存在问题,导致生产者无法访问 Kafka Broker。
  3. Kafka 监听器配置不正确: Kafka 的监听器配置可能不允许从容器外部或特定的网络访问。

解决方案

针对上述问题,可以采取以下步骤进行排查和解决:

1. 检查 Kafka 监听器配置

Kafka 的 KAFKA_ADVERTISED_LISTENERS 环境变量至关重要,它决定了 Kafka Broker 如何向客户端公布自己的地址。确保该配置正确反映了客户端访问 Kafka 的方式。

docker-compose.yml 文件中,检查 Kafka 服务的 environment 部分:

  kafka:     image: confluentinc/cp-kafka:6.1.1     container_name: kafka     depends_on:       - zookeeper     ports:       - '9092:9092'     expose:       - '29092'     environment:       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT       KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'       KAFKA_MIN_INSYNC_REPLICAS: '1'
  • PLAINTEXT://kafka:29092:允许容器内部的其他服务通过 kafka 这个 hostname 和 29092 端口访问 Kafka Broker。
  • PLAINTEXT_HOST://localhost:9092:允许宿主机通过 localhost 和 9092 端口访问 Kafka Broker。

注意事项:

  • 如果你的生产者运行在 Docker 容器之外,确保 PLAINTEXT_HOST 配置正确,并且宿主机能够访问 Kafka Broker。
  • 如果你的生产者运行在 Docker 容器内部,但与 Kafka Broker 不在同一个 Docker 网络中,你需要配置一个可以从生产者容器访问的地址。

2. 修改生产者 Broker 地址

生产者需要使用正确的 Broker 地址才能成功连接到 Kafka 集群。如果 Kafka Broker 的地址发生变化,或者生产者使用了错误的地址,就会导致连接失败。

在生产者代码中,检查 bootstrap.servers 配置项。确保它指向正确的 Kafka Broker 地址。

解决 Docker Compose 中 Kafka 消息发送失败问题

MGX

MetaGPT推出的自然语言编程工具

解决 Docker Compose 中 Kafka 消息发送失败问题64

查看详情 解决 Docker Compose 中 Kafka 消息发送失败问题

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import Java.util.Properties; import java.util.logging.Level; import java.util.logging.Logger;  public class Producer implements Runnable {      private static final Logger LOGGER = Logger.getLogger(Producer.class.getName());     private static final String TOPIC_NAME = "general-events";     private KafkaProducer<Long, String> kafkaProducer = null;     private final String KAFKA_CLUSTER_ENV_VAR_NAME = "KAFKA_CLUSTER";      public Producer() {         LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName());         Properties kafkaProps = new Properties();          // 使用环境变量或默认值配置 Kafka Broker 地址         String defaultClusterValue = "kafka:29092"; // 修改为 kafka:29092,容器内部访问         String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue);         LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster);          kafkaProps.put(ProducerConfig.bootstrap_SERVERS_CONFIG, kafkaCluster);         kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer");         kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");         kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0");          this.kafkaProducer = new KafkaProducer<>(kafkaProps);      } }

注意事项:

  • 如果生产者运行在 Docker 容器内部,应该使用 Kafka 容器的 hostname (kafka) 和内部端口 (29092)。
  • 如果生产者运行在 Docker 容器外部,应该使用 localhost 和映射到宿主机的端口 (9092)。

3. 检查容器网络

确保生产者容器和 Kafka 容器在同一个 Docker 网络中。可以使用 docker network inspect <network_name> 命令来检查容器的网络配置。

如果生产者容器和 Kafka 容器不在同一个网络中,可以使用 docker network connect <network_name> <container_name> 命令将生产者容器连接到 Kafka 容器所在的网络。

4. 验证 Topic 创建

虽然 init-kafka 服务创建了 topic,但仍建议再次确认 topic 是否成功创建。可以在 Kafka 容器内部执行以下命令:

docker exec -it kafka bash kafka-topics --bootstrap-server kafka:29092 --list

确认列表中包含 general-events topic。

总结

解决 Docker Compose 中 Kafka 消息发送失败的问题,关键在于确保 Kafka 的监听器配置正确,生产者使用正确的 Broker 地址,以及容器之间的网络连接正常。通过仔细检查这些配置,可以有效地解决此类连接问题,确保应用程序能够成功地向 Kafka 主题发送消息。



评论(已关闭)

评论已关闭