Kafka 代理过滤器

Apache Kafka 代理过滤器对 Apache Kafka 的客户端协议进行解码,包括有效载荷中的请求和响应。支持 Kafka 3.8.0 中的消息版本。

默认情况下,过滤器尝试不影响客户端和代理之间的通信,因此无法解码的消息(由于 Kafka 客户端或代理运行的版本高于此过滤器支持的版本)将按原样转发。但是,这要求上游 Kafka 集群以代理感知的方式配置(参见 配置(无流量变异))。

如果配置为变异接收到的流量,Envoy 代理过滤器可用于代理 Kafka 代理,而无需对代理配置进行任何更改。这需要为代理过滤器提供重写规则,以便 Kafka 代理通告的地址可以更改为 Envoy 监听器地址(参见 配置(有流量变异))。

  • 此过滤器应使用类型 URL type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker 进行配置。

  • v3 API 参考

注意

Kafka 代理过滤器仅包含在 contrib 镜像

注意

kafka_broker 过滤器处于实验阶段,目前正在积极开发中。功能将随着时间的推移而扩展,配置结构可能会发生变化。

配置(无流量变异)

Kafka 代理过滤器可以在不重写任何请求/响应的情况下运行。

过滤器应与 TCP 代理过滤器一起链接,如下面的代码片段所示

listeners:
- address:
    socket_address:
      address: 127.0.0.1 # Host that Kafka clients should connect to (i.e. bootstrap.servers).
      port_value: 19092  # Port that Kafka clients should connect to (i.e. bootstrap.servers).
  filter_chains:
  - filters:
    - name: envoy.filters.network.kafka_broker
      typed_config:
        "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
        stat_prefix: exampleprefix
    - name: envoy.filters.network.tcp_proxy
      typed_config:
        "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
        stat_prefix: tcp
        cluster: localkafka
clusters:
- name: localkafka
  connect_timeout: 0.25s
  type: strict_dns
  lb_policy: round_robin
  load_assignment:
    cluster_name: some_service
    endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: 127.0.0.1 # Kafka broker's host.
                port_value: 9092   # Kafka broker's port.

然后,Kafka 代理需要通告 Envoy 监听器端口,而不是它自己的端口 - 这使得下游客户端仅对 Envoy 进行任何新的连接。

# Listener value needs to be equal to cluster value in Envoy config
# (will receive payloads from Envoy).
listeners=PLAINTEXT://127.0.0.1:9092

# Advertised listener value needs to be equal to Envoy's listener
# (will make clients discovering this broker talk to it through Envoy).
advertised.listeners=PLAINTEXT://127.0.0.1:19092

配置(有流量变异)

Kafka 代理过滤器可以变异接收到的响应的内容,以简化 Kafka 集群的代理。

以下示例显示了一个 Envoy 实例的配置,该实例尝试代理 2 节点集群中的代理

listeners:
- address: # This listener proxies broker 1.
    socket_address:
      address: envoy.example.org # Host that Kafka clients should connect to (i.e. bootstrap.servers).
      port_value: 19092          # Port that Kafka clients should connect to (i.e. bootstrap.servers).
  filter_chains:
  - filters:
    - name: envoy.filters.network.kafka_broker
      typed_config:
        "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
        stat_prefix: exampleprefix1
        id_based_broker_address_rewrite_spec: &kafka_rewrite_spec
          rules:
          - id: 1
            host: envoy.example.org
            port: 19092
          - id: 2
            host: envoy.example.org
            port: 19093
    - name: envoy.filters.network.tcp_proxy
      typed_config:
        "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
        stat_prefix: tcp
        cluster: broker1cluster
- address: # This listener proxies broker 2.
    socket_address:
      address: envoy.example.org # Host that Kafka clients should connect to (i.e. bootstrap.servers).
      port_value: 19093          # Port that Kafka clients should connect to (i.e. bootstrap.servers).
  filter_chains:
  - filters:
    - name: envoy.filters.network.kafka_broker
      typed_config:
        "@type": type.googleapis.com/envoy.extensions.filters.network.kafka_broker.v3.KafkaBroker
        stat_prefix: exampleprefix2
        id_based_broker_address_rewrite_spec: *kafka_rewrite_spec
    - name: envoy.filters.network.tcp_proxy
      typed_config:
        "@type": type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy
        stat_prefix: tcp
        cluster: broker2cluster

clusters:
- name: broker1cluster
  connect_timeout: 0.25s
  type: strict_dns
  lb_policy: round_robin
  load_assignment:
    cluster_name: some_service
    endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: broker1.example.org # Kafka broker's host for broker 1.
                port_value: 9092             # Kafka broker's port for broker 1.
- name: broker2cluster
  connect_timeout: 0.25s
  type: strict_dns
  lb_policy: round_robin
  load_assignment:
    cluster_name: some_service
    endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: broker2.example.org # Kafka broker's host for broker 2.
                port_value: 9092             # Kafka broker's port for broker 2.

地址重写规则应涵盖集群中存在的所有代理 - 可以使用 YAML 块来避免重复。

可以变异的响应是

  • 元数据(所有分区发现操作),

  • 查找协调器(由消费者组和事务使用),

  • 描述集群。

调试

Java 客户端可以查看使用的主机,如果它们将 org.apache.kafka.clients.NetworkClient 的日志级别设置为 debug - 日志中只应看到 Envoy 的监听器。

[DEBUG] [NetworkClient] Initiating connection to node localhost:19092 (id: -1 rack: null) using address localhost/127.0.0.1
[DEBUG] [NetworkClient] Completed connection to node -1. Fetching API versions.
[DEBUG] [NetworkClient] Initiating connection to node localhost:19092 (id: 1 rack: null) using address localhost/127.0.0.1
[DEBUG] [NetworkClient] Completed connection to node 1. Fetching API versions.
[DEBUG] [NetworkClient] Initiating connection to node localhost:19094 (id: 3 rack: null) using address localhost/127.0.0.1
[DEBUG] [NetworkClient] Initiating connection to node localhost:19093 (id: 2 rack: null) using address localhost/127.0.0.1
[DEBUG] [NetworkClient] Completed connection to node 2. Fetching API versions.
[DEBUG] [NetworkClient] Completed connection to node 3. Fetching API versions.

统计信息

每个配置的 Kafka 代理过滤器都有以 kafka.<stat_prefix>. 为根的统计信息,每个消息类型都有多个统计信息。

名称

类型

描述

request.TYPE

计数器

从 Kafka 客户端接收特定类型请求的次数

request.unknown

计数器

接收格式未被此过滤器识别的请求的次数

request.failure

计数器

接收格式无效的请求或发生其他处理异常的次数

response.TYPE

计数器

从 Kafka 代理接收特定类型响应的次数

response.TYPE_duration

直方图

响应生成时间(以毫秒为单位)

response.unknown

计数器

接收格式未被此过滤器识别的响应的次数

response.failure

计数器

接收格式无效的响应或发生其他处理异常的次数