Kafka 网格过滤器
Apache Kafka 网格过滤器为 Apache Kafka 集群提供了一个外观。
它允许处理下游客户端发送的生产 (生产者) 和获取 (消费者) 请求。
此过滤器实例接收的请求可以转发到多个集群中的一个,具体取决于配置的转发规则。
支持 Kafka 3.8.0 中的相应消息版本。
此过滤器应使用类型 URL
type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh配置。
注意
Kafka 网格过滤器仅包含在 contrib 映像 中。
注意
kafka_mesh 过滤器是实验性的,目前处于积极开发阶段。随着时间的推移,功能将得到扩展,配置结构可能会发生变化。
注意
kafka_mesh 过滤器在 Windows 上不工作(阻止程序是编译 librdkafka)。
配置
以下示例展示了代理 3 个 Kafka 集群的典型过滤器配置。客户端将连接到“127.0.0.1:19092”,他们的消息将根据主题名称分发到集群。
listeners:
- address:
socket_address:
address: 127.0.0.1 # Host that Kafka clients should connect to.
port_value: 19092 # Port that Kafka clients should connect to.
filter_chains:
- filters:
- name: envoy.filters.network.kafka_mesh
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh
advertised_host: "127.0.0.1"
advertised_port: 19092
upstream_clusters:
- cluster_name: kafka_c1
bootstrap_servers: cluster1_node1:9092,cluster1_node2:9092,cluster1_node3:9092
partition_count: 1
- cluster_name: kafka_c2
bootstrap_servers: cluster2_node1:9092,cluster2_node2:9092,cluster2_node3:9092
partition_count: 1
- cluster_name: kafka_c3
bootstrap_servers: cluster3_node1:9092,cluster3_node2:9092
partition_count: 5
producer_config:
acks: "1"
linger.ms: "500"
consumer_config:
client.id: "my-envoy-consumer"
forwarding_rules:
- target_cluster: kafka_c1
topic_prefix: apples
- target_cluster: kafka_c2
topic_prefix: bananas
- target_cluster: kafka_c3
topic_prefix: cherries
需要注意的是,Kafka 代理过滤器可以在过滤器链中插入到 Kafka 网格过滤器之前,以捕获请求处理指标。
说明
记录使用嵌入式 librdkafka 生产者/消费者发送/接收。
librdkafka 在没有 ssl、lz4、gssapi 的情况下编译,因此不支持相关的自定义配置选项。
无效的自定义配置在启动时不会被发现(只有在相应的生产者或消费者被初始化时才会被发现)。将引用这些集群的请求将关闭连接并失败。
引用与任何规则不匹配的主题的请求将关闭连接并失败。这通常不应该发生(客户端首先请求元数据,然后他们应该首先以“无可用代理”失败),但如果有人在连接上定制二进制有效负载,则可能会发生这种情况。
生产者代理
指向上游 Kafka 集群的嵌入式 librdkafka 生产者是为每个 Envoy 工作线程创建的(因此可以通过 –concurrency 选项增加吞吐量,允许更多数量的生产者处理请求)。
仅支持版本 2 的 ProduceRequests(这意味着 0.8 等非常旧的生产者将不受支持)。
Python 生产者需要将 API 版本设置为至少 1.0.0,以便他们发送的生产请求将具有值为 2 的记录。
Kafka 生产者“acks”属性的下游处理委托给上游客户端。例如,如果上游客户端配置为使用 acks=0,那么响应将尽快发送到下游客户端(即使它们具有非零 acks!)。
由于过滤器将单个生产者请求拆分为单独的记录,因此可能只有其中一些记录的传递失败。在这种情况下,返回给上游客户端的响应是失败的,但是一些记录可能已附加到目标集群中。
由于上述拆分,记录不一定一个接一个地追加(因为它们没有作为单个请求发送到上游)。想要避免这种情况的用户可能需要查看下游生产者配置:“linger.ms”和“batch.size”。
消费者代理
目前,消费者代理仅支持有状态代理 - Envoy 使用指向上游的 librdkafka 消费者来接收记录,并且只有在请求更多数据时才会这样做。
用户可能需要查看消费者的配置属性 *group.id* 以管理消费者的偏移量提交行为(在 Envoy 重启时具有意义)。
请求消费者位置时,响应始终包含偏移量 = 0(参见 *list_offsets.cc*)。
提供记录偏移量信息,但记录批次偏移量增量没有提供 - 已经观察到 Apache Kafka Java 客户端即使在接收到记录后也不会更新其位置(参见 *fetch_record_converter.cc*)。
如果 Fetch 响应至少收集了 3 条记录,则将发送到下游(参见 *fetch.cc*)。当前实现忽略请求中有关请求字节等的数据。
Fetch 响应在被认为已完成(见上文)或经过 5 秒的硬编码超时后发送(参见 *fetch.cc*)。请求中指定的超时被忽略。
只要这些主题有传入请求,消费者就会从主题中轮询记录,而不会考虑分区。鼓励用户确保从所有分区中消费记录,以避免例如我们仅从分区 0 中获取记录,但代理接收到分区 0 的记录(发送到下游)和分区 1 的记录(保留在内存中,直到有人对它们感兴趣)的情况。