Kafka 代理

本示例演示了通过 Envoy 代理的 Kafka 代理的一些基本操作。

为了方便起见,composition 提供了一个 dockerized Kafka 客户端。

如果您在主机系统上安装了 kafka-console-* 二进制文件,则可以使用主机二进制文件和 --bootstrap-server localhost:10000 按照示例操作。

还演示了 Envoy 为 Kafka 代理扩展和相关集群指标收集的统计信息。

步骤 1:启动所有容器

切换到 examples/kafka 目录。

$ pwd
envoy/examples/kafka
$ docker compose pull
$ docker compose up --build -d
$ docker compose ps

         Name                      Command                State                            Ports
-----------------------------------------------------------------------------------------------------------------------
kafka_kafka-server_1   /etc/confluent/docker/run      Up             9092/tcp
kafka_proxy_1          /docker-entrypoint.sh /usr ... Up             0.0.0.0:10000->10000/tcp, 0.0.0.0:8001->8001/tcp
kafka_zookeeper_1      /etc/confluent/docker/run      Up (healthy)   2181/tcp, 2888/tcp, 3888/tcp

步骤 2:创建 Kafka 主题

首先使用名称 envoy-kafka-broker 创建一个 Kafka 主题

$ export TOPIC="envoy-kafka-broker"
$ docker compose run --rm kafka-client kafka-topics --bootstrap-server proxy:10000 --create --topic $TOPIC

步骤 3:检查 Kafka 主题

您可以使用 kafka-topics --list 参数查看 Kafka 知道的主题。

检查您创建的主题是否存在

$ docker compose run --rm kafka-client kafka-topics --bootstrap-server proxy:10000 --list | grep $TOPIC

步骤 4:使用 Kafka 生产者发送消息

接下来,使用 kafka-console-producer 为您创建的主题发送一条消息

$ export MESSAGE="Welcome to Envoy and Kafka broker filter!"
$ docker compose run --rm kafka-client /bin/bash -c " \
    echo $MESSAGE \
    | kafka-console-producer --request-required-acks 1 --broker-list proxy:10000 --topic $TOPIC"

步骤 5:使用 Kafka 消费者接收消息

现在您可以使用 kafka-console-consumer 接收消息

$ docker compose run --rm kafka-client kafka-console-consumer --bootstrap-server proxy:10000 --topic $TOPIC --from-beginning --max-messages 1 | grep "$MESSAGE"

步骤 6:检查管理员 kafka_broker 统计信息

当您代理到 Kafka 代理时,Envoy 会记录各种统计信息。

您可以通过查询 Envoy 管理员界面来检查代理统计信息(数字可能略有不同,因为 kafka-client 不会公开对其网络流量的精确控制)

$ curl -s "https://127.0.0.1:8001/stats?filter=kafka.kafka_broker" | grep -v ": 0" | grep "_request:"
kafka.kafka_broker.request.api_versions_request: 9
kafka.kafka_broker.request.create_topics_request: 1
kafka.kafka_broker.request.fetch_request: 2
kafka.kafka_broker.request.find_coordinator_request: 8
kafka.kafka_broker.request.join_group_request: 2
kafka.kafka_broker.request.leave_group_request: 1
kafka.kafka_broker.request.list_offsets_request: 1
kafka.kafka_broker.request.metadata_request: 12
kafka.kafka_broker.request.offset_fetch_request: 1
kafka.kafka_broker.request.produce_request: 1
kafka.kafka_broker.request.sync_group_request: 1

步骤 7:检查管理员 kafka_service 集群统计信息

Envoy 还会记录 Kafka 服务的集群统计信息

$ curl -s "https://127.0.0.1:8001/stats?filter=cluster.kafka_service" | grep -v ": 0"
cluster.kafka_service.max_host_weight: 1
cluster.kafka_service.membership_healthy: 1
cluster.kafka_service.membership_total: 1

另请参阅

Envoy Kafka 代理过滤器

了解有关 Kafka 代理过滤器的更多信息。

Kafka

Apache Kafka。