Kafka 网格过滤器

Apache Kafka 网格过滤器为 Apache Kafka 集群提供了一个外观。

它允许处理下游客户端发送的生产 (生产者) 和获取 (消费者) 请求。

此过滤器实例接收的请求可以转发到多个集群中的一个,具体取决于配置的转发规则。

支持 Kafka 3.8.0 中的相应消息版本。

  • 此过滤器应使用类型 URL type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh 配置。

  • v3 API 参考

注意

Kafka 网格过滤器仅包含在 contrib 映像 中。

注意

kafka_mesh 过滤器是实验性的,目前处于积极开发阶段。随着时间的推移,功能将得到扩展,配置结构可能会发生变化。

注意

kafka_mesh 过滤器在 Windows 上不工作(阻止程序是编译 librdkafka)。

配置

以下示例展示了代理 3 个 Kafka 集群的典型过滤器配置。客户端将连接到“127.0.0.1:19092”,他们的消息将根据主题名称分发到集群。

listeners:
- address:
    socket_address:
      address: 127.0.0.1 # Host that Kafka clients should connect to.
      port_value: 19092  # Port that Kafka clients should connect to.
  filter_chains:
  - filters:
    - name: envoy.filters.network.kafka_mesh
      typed_config:
        "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_mesh.v3alpha.KafkaMesh
        advertised_host: "127.0.0.1"
        advertised_port: 19092
        upstream_clusters:
        - cluster_name: kafka_c1
          bootstrap_servers: cluster1_node1:9092,cluster1_node2:9092,cluster1_node3:9092
          partition_count: 1
        - cluster_name: kafka_c2
          bootstrap_servers: cluster2_node1:9092,cluster2_node2:9092,cluster2_node3:9092
          partition_count: 1
        - cluster_name: kafka_c3
          bootstrap_servers: cluster3_node1:9092,cluster3_node2:9092
          partition_count: 5
          producer_config:
            acks: "1"
            linger.ms: "500"
          consumer_config:
            client.id: "my-envoy-consumer"
        forwarding_rules:
        - target_cluster: kafka_c1
          topic_prefix: apples
        - target_cluster: kafka_c2
          topic_prefix: bananas
        - target_cluster: kafka_c3
          topic_prefix: cherries

需要注意的是,Kafka 代理过滤器可以在过滤器链中插入到 Kafka 网格过滤器之前,以捕获请求处理指标。

说明

  1. 记录使用嵌入式 librdkafka 生产者/消费者发送/接收。

  2. librdkafka 在没有 ssl、lz4、gssapi 的情况下编译,因此不支持相关的自定义配置选项。

  3. 无效的自定义配置在启动时不会被发现(只有在相应的生产者或消费者被初始化时才会被发现)。将引用这些集群的请求将关闭连接并失败。

  4. 引用与任何规则不匹配的主题的请求将关闭连接并失败。这通常不应该发生(客户端首先请求元数据,然后他们应该首先以“无可用代理”失败),但如果有人在连接上定制二进制有效负载,则可能会发生这种情况。

生产者代理

  1. 指向上游 Kafka 集群的嵌入式 librdkafka 生产者是为每个 Envoy 工作线程创建的(因此可以通过 –concurrency 选项增加吞吐量,允许更多数量的生产者处理请求)。

  2. 仅支持版本 2 的 ProduceRequests(这意味着 0.8 等非常旧的生产者将不受支持)。

  3. Python 生产者需要将 API 版本设置为至少 1.0.0,以便他们发送的生产请求将具有值为 2 的记录。

  4. Kafka 生产者“acks”属性的下游处理委托给上游客户端。例如,如果上游客户端配置为使用 acks=0,那么响应将尽快发送到下游客户端(即使它们具有非零 acks!)。

  5. 由于过滤器将单个生产者请求拆分为单独的记录,因此可能只有其中一些记录的传递失败。在这种情况下,返回给上游客户端的响应是失败的,但是一些记录可能已附加到目标集群中。

  6. 由于上述拆分,记录不一定一个接一个地追加(因为它们没有作为单个请求发送到上游)。想要避免这种情况的用户可能需要查看下游生产者配置:“linger.ms”和“batch.size”。

消费者代理

  1. 目前,消费者代理仅支持有状态代理 - Envoy 使用指向上游的 librdkafka 消费者来接收记录,并且只有在请求更多数据时才会这样做。

  2. 用户可能需要查看消费者的配置属性 *group.id* 以管理消费者的偏移量提交行为(在 Envoy 重启时具有意义)。

  3. 请求消费者位置时,响应始终包含偏移量 = 0(参见 *list_offsets.cc*)。

  4. 提供记录偏移量信息,但记录批次偏移量增量没有提供 - 已经观察到 Apache Kafka Java 客户端即使在接收到记录后也不会更新其位置(参见 *fetch_record_converter.cc*)。

  5. 如果 Fetch 响应至少收集了 3 条记录,则将发送到下游(参见 *fetch.cc*)。当前实现忽略请求中有关请求字节等的数据。

  6. Fetch 响应在被认为已完成(见上文)或经过 5 秒的硬编码超时后发送(参见 *fetch.cc*)。请求中指定的超时被忽略。

  7. 只要这些主题有传入请求,消费者就会从主题中轮询记录,而不会考虑分区。鼓励用户确保从所有分区中消费记录,以避免例如我们仅从分区 0 中获取记录,但代理接收到分区 0 的记录(发送到下游)和分区 1 的记录(保留在内存中,直到有人对它们感兴趣)的情况。