本文旨在解决在使用 docker Compose 部署 kafka 集群时,应用程序无法向 Kafka 主题发送消息的问题。我们将分析常见的配置错误,并提供修改建议,确保生产者能够正确连接到 Kafka Broker,从而成功发送消息。通过调整 Kafka 的监听器配置以及生产者端的 Broker 地址,可以有效解决此类连接问题。
问题分析
在使用 Docker Compose 部署 Kafka 集群时,生产者无法发送消息到 Kafka 主题,并出现类似 Topic general-events not present in metadata after 60000 ms 的错误,通常是由于以下原因导致:
- Kafka Broker 地址配置错误: 生产者配置的 Broker 地址与 Kafka 实际监听的地址不匹配。
- 网络配置问题: 容器之间的网络连接存在问题,导致生产者无法访问 Kafka Broker。
- Kafka 监听器配置不正确: Kafka 的监听器配置可能不允许从容器外部或特定的网络访问。
解决方案
针对上述问题,可以采取以下步骤进行排查和解决:
1. 检查 Kafka 监听器配置
Kafka 的 KAFKA_ADVERTISED_LISTENERS 环境变量至关重要,它决定了 Kafka Broker 如何向客户端公布自己的地址。确保该配置正确反映了客户端访问 Kafka 的方式。
在 docker-compose.yml 文件中,检查 Kafka 服务的 environment 部分:
kafka: image: confluentinc/cp-kafka:6.1.1 container_name: kafka depends_on: - zookeeper ports: - '9092:9092' expose: - '29092' environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1' KAFKA_MIN_INSYNC_REPLICAS: '1'
- PLAINTEXT://kafka:29092:允许容器内部的其他服务通过 kafka 这个 hostname 和 29092 端口访问 Kafka Broker。
- PLAINTEXT_HOST://localhost:9092:允许宿主机通过 localhost 和 9092 端口访问 Kafka Broker。
注意事项:
- 如果你的生产者运行在 Docker 容器之外,确保 PLAINTEXT_HOST 配置正确,并且宿主机能够访问 Kafka Broker。
- 如果你的生产者运行在 Docker 容器内部,但与 Kafka Broker 不在同一个 Docker 网络中,你需要配置一个可以从生产者容器访问的地址。
2. 修改生产者 Broker 地址
生产者需要使用正确的 Broker 地址才能成功连接到 Kafka 集群。如果 Kafka Broker 的地址发生变化,或者生产者使用了错误的地址,就会导致连接失败。
在生产者代码中,检查 bootstrap.servers 配置项。确保它指向正确的 Kafka Broker 地址。
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import Java.util.Properties; import java.util.logging.Level; import java.util.logging.Logger; public class Producer implements Runnable { private static final Logger LOGGER = Logger.getLogger(Producer.class.getName()); private static final String TOPIC_NAME = "general-events"; private KafkaProducer<Long, String> kafkaProducer = null; private final String KAFKA_CLUSTER_ENV_VAR_NAME = "KAFKA_CLUSTER"; public Producer() { LOGGER.log(Level.INFO, "Kafka Producer running in thread {0}", Thread.currentThread().getName()); Properties kafkaProps = new Properties(); // 使用环境变量或默认值配置 Kafka Broker 地址 String defaultClusterValue = "kafka:29092"; // 修改为 kafka:29092,容器内部访问 String kafkaCluster = System.getProperty(KAFKA_CLUSTER_ENV_VAR_NAME, defaultClusterValue); LOGGER.log(Level.INFO, "Kafka cluster {0}", kafkaCluster); kafkaProps.put(ProducerConfig.bootstrap_SERVERS_CONFIG, kafkaCluster); kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.LongSerializer"); kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); kafkaProps.put(ProducerConfig.ACKS_CONFIG, "0"); this.kafkaProducer = new KafkaProducer<>(kafkaProps); } }
注意事项:
- 如果生产者运行在 Docker 容器内部,应该使用 Kafka 容器的 hostname (kafka) 和内部端口 (29092)。
- 如果生产者运行在 Docker 容器外部,应该使用 localhost 和映射到宿主机的端口 (9092)。
3. 检查容器网络
确保生产者容器和 Kafka 容器在同一个 Docker 网络中。可以使用 docker network inspect <network_name> 命令来检查容器的网络配置。
如果生产者容器和 Kafka 容器不在同一个网络中,可以使用 docker network connect <network_name> <container_name> 命令将生产者容器连接到 Kafka 容器所在的网络。
4. 验证 Topic 创建
虽然 init-kafka 服务创建了 topic,但仍建议再次确认 topic 是否成功创建。可以在 Kafka 容器内部执行以下命令:
docker exec -it kafka bash kafka-topics --bootstrap-server kafka:29092 --list
确认列表中包含 general-events topic。
总结
解决 Docker Compose 中 Kafka 消息发送失败的问题,关键在于确保 Kafka 的监听器配置正确,生产者使用正确的 Broker 地址,以及容器之间的网络连接正常。通过仔细检查这些配置,可以有效地解决此类连接问题,确保应用程序能够成功地向 Kafka 主题发送消息。
评论(已关闭)
评论已关闭