Beam Kafka Produce
组件介绍
使用 Beam 执行引擎将记录发布到 Kafka 集群。
- **输入:**需要发布的记录。
- **输出:**无。
**注意要点:**目前仅支持 String 作为键以及 String 或 Avro Record 作为值。
页面介绍
双击打开“Beam Kafka Produce”组件,如下图所示:
参数选项
界面中的各个参数选项解释如下:
列名 | 说明 | 样例值 |
---|---|---|
Bootstrap servers | 逗号分隔的主机列表,这些主机是“bootstrap”集群中的 Kafka 代理。 | |
Topics | 发布的 topic。 | |
键字段 | record key。 | |
消息字段 | record message。 |
表格中可使用的参数如下:
选项 | 样例值 |
---|---|
schema.registry.url | https://abcd-12345x.europe-west3.gcp.confluent。cloud |
value.converter.schema.registry.url | https://abcd-12345x.europe-west3.gcp.confluent.cloud |
auto.register.schemas | true |
security.protocol | SASL_SSL |
sasl.jaas.config | org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER_API_KEY" password="CLUSTER_API_SECRET"; |
username | CLUSTER_API_KEY |
password | CLUSTER_API_SECRET |
sasl.mechanism | PLAIN |
client.dns.lookup | use_all_dns_ips |
acks | ALL |
basic.auth.credentials.source | USER_INFO |
basic.auth.user.info | CLUSTER_API_KEY:CLUSTER_API_SECRET |
schema.registry.basic.auth.user.info | SCHEMA_REGISTRY_API_KEY:SCHEMA_REGISTRY_API_SECRET |
使用案例
将“生成记录”组件生成的数据发布到 kafka 中,pipeline 构建如图所示:
使用“生成记 录”组件生成测试数据:
配置 Beam Kafka Produce 组件:
点击运行后可对发布的数据进行消费: