Kafka Connect实战指南
Kafka Connect实战指南引言Kafka Connect是Kafka生态系统中的数据集成框架用于实现Kafka与其他数据系统之间的高效数据传输。通过Kafka Connect可以方便地将数据从外部系统导入Kafka或将Kafka中的数据导出到外部系统。本文将详细介绍Kafka Connect的架构、配置和使用方法。Kafka Connect架构1.1 核心概念Kafka Connect包含以下几个核心概念Connector连接器定义了数据从源到目标的数据流Task任务Connector的实际工作单元负责数据传输Worker工作进程运行Connector和Task的进程Source数据源从外部系统读取数据的组件Sink数据目标将数据写入外部系统的组件┌──────────────────────────────────────────────────────────────┐ │ Kafka Connect Cluster │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │ │ │ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │ │ │ │ │Source │ │ │ │Source │ │ │ │ Sink │ │ │ │ │ │Connector│ │ │ │Connector│ │ │Connector│ │ │ │ │ └────────┘ │ │ └────────┘ │ │ └────────┘ │ │ │ │ ┌────────┐ │ │ ┌────────┐ │ │ ┌────────┐ │ │ │ │ │ Task 1 │ │ │ │ Task 2 │ │ │ │ Task 3 │ │ │ │ │ └────────┘ │ │ └────────┘ │ │ └────────┘ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ └──────────────────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ Database │ │ Files │ │ Data Warehouse│ │ (Source) │ │ (Source) │ │ (Sink) │ └────────────┘ └────────────┘ └────────────┘1.2 独立模式配置# connect-standalone.properties # Kafka连接配置 bootstrap.serverslocalhost:9092 # 插件路径 plugin.path/usr/local/share/kafka/plugins # 关键配置 key.converterorg.apache.kafka.connect.json.JsonConverter value.converterorg.apache.kafka.connect.json.JsonConverter key.converter.schemas.enablefalse value.converter.schemas.enablefalse # 偏移量配置 offset.storage.topicconnect-offsets offset.storage.replication.factor1 offset.storage.partitions1 # 配置存储 config.storage.topicconnect-configs config.storage.replication.factor1 # 状态存储 status.storage.topicconnect-status status.storage.replication.factor1 status.storage.partitions1 # 转换器配置 internal.key.converterorg.apache.kafka.connect.json.JsonConverter internal.value.converterorg.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enablefalse internal.value.converter.schemas.enablefalse1.3 分布式模式配置# connect-distributed.properties bootstrap.serverslocalhost:9092,localhost:9093,localhost:9094 plugin.path/usr/local/share/kafka/plugins key.converterorg.apache.kafka.connect.json.JsonConverter value.converterorg.apache.kafka.connect.json.JsonConverter key.converter.schemas.enablefalse value.converter.schemas.enablefalse group.idconnect-cluster offset.storage.topicconnect-offsets offset.storage.replication.factor3 offset.storage.partitions5 config.storage.topicconnect-configs config.storage.replication.factor3 status.storage.topicconnect-status status.storage.replication.factor3 status.storage.partitions5 # 心跳配置 heartbeat.interval.ms3000 session.timeout.ms30000 # REST API配置 rest.host.namelocalhost rest.port8083 internal.key.converterorg.apache.kafka.connect.json.JsonConverter internal.value.converterorg.apache.kafka.connect.json.JsonConverterSource Connector配置2.1 JDBC Source Connector# jdbc-source.properties namejdbc-source-connector connector.classio.confluent.connect.jdbc.JdbcSourceConnector tasks.max3 # 数据库连接配置 connection.urljdbc:mysql://localhost:3306/mydb connection.userconnect_user connection.passwordconnect_password # 查询配置 table.whitelistusers,orders,products querySELECT * FROM users WHERE updated_at ${offset:last_update_ts} incrementing.column.nameid timestamp.column.nameupdated_at # 轮询配置 poll.interval.ms5000 batch.max.rows100 # 主题配置 topic.prefixdb- # 模式配置 schema.patternPUBLIC{ name: jdbc-source-connector, config: { connector.class: io.confluent.connect.jdbc.JdbcSourceConnector, tasks.max: 3, connection.url: jdbc:mysql://localhost:3306/mydb?userconnect_userpasswordconnect_password, table.whitelist: users,orders,products, mode: timestampincrementing, incrementing.column.name: id, timestamp.column.name: updated_at, poll.interval.ms: 5000, batch.max.rows: 100, topic.prefix: db- } }2.2 File Source Connector# file-source.properties namefile-source-connector connector.classFileStreamSourceConnector tasks.max1 # 文件配置 file/data/input.log topicfile-input # 任务配置 task.batch.size10002.3 HTTP Source Connector# http-source.properties namehttp-source-connector connector.classio.confluent.connect.http.HttpSourceConnector tasks.max5 # HTTP配置 http.urlhttps://api.example.com/data http.poll.interval.ms30000 http.timeout.ms10000 # 认证配置 http.auth.typeBASIC http.auth.usertestuser http.auth.passwordtestpass # 主题配置 topichttp-data # 轮询配置 poll.interval.ms10000Sink Connector配置3.1 JDBC Sink Connector# jdbc-sink.properties namejdbc-sink-connector connector.classio.confluent.connect.jdbc.JdbcSinkConnector tasks.max3 # 数据库连接 connection.urljdbc:mysql://localhost:3306/mydb connection.usersink_user connection.passwordsink_password # 表配置 topicsorders,customers tablesorders,customers # 插入模式 insert.modeupsert pk.moderecord_value pk.fieldsid # 错误处理 errors.toleranceall errors.dead.letter.queue.topic.namedlq-orders errors.dead.letter.queue.topic.replication.factor1 # 字段映射 fields.whitelistid,name,email,created_at{ name: jdbc-sink-connector, config: { connector.class: io.confluent.connect.jdbc.JdbcSinkConnector, tasks.max: 3, connection.url: jdbc:mysql://localhost:3306/mydb, connection.user: sink_user, connection.password: sink_password, topics: orders, table.name.format: orders_sink, insert.mode: upsert, pk.mode: record_value, pk.fields: order_id, auto.create: true, errors.tolerance: all } }3.2 Elasticsearch Sink Connector# elasticsearch-sink.properties nameelasticsearch-sink-connector connector.classio.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max3 # ES连接配置 connection.urlhttp://localhost:9200 connection.usernameelastic connection.passwordchangeme # 索引配置 topicsorders,customers index.translatetrue index.name.pattern${topic} # 批处理配置 batch.size2000 flush.timeout.ms10000 # 错误处理 behavior.on.malformed.documentsignore behavior.on.null.valuesignore errors.toleranceall errors.dead.letter.queue.topic.namedlq-elasticsearch3.3 S3 Sink Connector# s3-sink.properties names3-sink-connector connector.classio.confluent.connect.s3.S3SinkConnector tasks.max3 # S3配置 s3.regionus-east-1 s3.bucket.namemy-kafka-data s3.part.size5242880 # 格式配置 format.classio.confluent.connect.storage.format.parquet.ParquetFormat partitioner.classio.confluent.connect.storage.partitioner.TimeBasedPartitioner # 分区配置 path.formatyearYYYY/monthMM/dayDD/hourHH partition.duration.ms3600000 localeen_US timezoneUTC # 主题配置 topicsorders,events # 刷新配置 flush.size10000 rotate.interval.ms300000转换器配置4.1 JSON转换器# JSON转换器配置 key.converterorg.apache.kafka.connect.json.JsonConverter value.converterorg.apache.kafka.connect.json.JsonConverter # 启用模式 key.converter.schemas.enabletrue value.converter.schemas.enabletrue4.2 Avro转换器# Avro转换器配置 key.converterio.confluent.connect.avro.AvroConverter value.converterio.confluent.connect.avro.AvroConverter # Schema Registry配置 key.converter.schema.registry.urlhttp://localhost:8081 value.converter.schema.registry.urlhttp://localhost:8081 # Schema注册 key.converter.basic.auth.credentials.sourceUSER_INFO key.converter.basic.auth.user.infouser:password value.converter.basic.auth.credentials.sourceUSER_INFO value.converter.basic.auth.user.infouser:password4.3 字符串转换器# 字符串转换器配置 key.converterorg.apache.kafka.connect.storage.StringConverter value.converterorg.apache.kafka.connect.storage.StringConverter单向转换器配置5.1 InsertField转换器# 添加字段转换器配置 transformsinsertField transforms.insertField.typeorg.apache.kafka.connect.transforms.InsertField$Value # 添加时间戳字段 transforms.insertField.timestamp.fieldinsert_time # 添加偏移量字段 transforms.insertField.offset.fieldoffset5.2 ReplaceField转换器# 替换字段转换器 transformsreplaceField transforms.replaceField.typeorg.apache.kafka.connect.transforms.ReplaceField$Value # 移除某些字段 transforms.replaceField.blacklistpassword,secret # 保留某些字段 transforms.replaceField.whitelistid,name,email # 重命名字段 transforms.replaceField.renamesold_name:new_name,old_id:new_id5.3 Filter转换器# 过滤转换器 transformsfilter transforms.filter.typeorg.apache.kafka.connect.transforms.Filter$Value # 过滤条件 transforms.filter.filter.condition$[?(.status active)] # 过滤逻辑 transforms.filter.typeorg.apache.kafka.connect.transforms.Filter$Value5.4 链式转换器# 多重转换器配置 transformsinsertTimestamp,replaceField,hoistField transforms.insertTimestamp.typeorg.apache.kafka.connect.transforms.InsertField$Value transforms.insertTimestamp.timestamp.fieldevent_time transforms.replaceField.typeorg.apache.kafka.connect.transforms.ReplaceField$Value transforms.replaceField.whitelistid,name,value transforms.hoistField.typeorg.apache.kafka.connect.transforms.HoistField$Value transforms.hoistField.withFieldmetadataREST API管理6.1 连接器管理# 启动分布式模式 connect-distributed.sh connect-distributed.properties # 创建连接器 curl -X POST http://localhost:8083/connectors \ -H Content-Type: application/json \ -d { name: jdbc-source-connector, config: { connector.class: io.confluent.connect.jdbc.JdbcSourceConnector, tasks.max: 3, connection.url: jdbc:mysql://localhost:3306/mydb, table.whitelist: users, topic.prefix: db- } } # 查看所有连接器 curl http://localhost:8083/connectors # 查看连接器状态 curl http://localhost:8083/connectors/jdbc-source-connector/status # 获取连接器配置 curl http://localhost:8083/connectors/jdbc-source-connector/config # 更新连接器配置 curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/config \ -H Content-Type: application/json \ -d { connector.class: io.confluent.connect.jdbc.JdbcSourceConnector, tasks.max: 5, connection.url: jdbc:mysql://localhost:3306/mydb, table.whitelist: users,orders, topic.prefix: db- } # 暂停连接器 curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/pause # 恢复连接器 curl -X PUT http://localhost:8083/connectors/jdbc-source-connector/resume # 删除连接器 curl -X DELETE http://localhost:8083/connectors/jdbc-source-connector6.2 任务管理# 重启连接器 curl -X POST http://localhost:8083/connectors/jdbc-source-connector/restart # 重启失败的任务 curl -X POST http://localhost:8083/connectors/jdbc-source-connector/tasks/0/restart # 查看所有任务 curl http://localhost:8083/connectors/jdbc-source-connector/tasks监控与调试7.1 监控指标# 获取Connect集群信息 curl http://localhost:8083/ # 查看集群状态 curl http://localhost:8083/clusters # 查看消费者偏移量 curl http://localhost:8083/connectors/jdbc-source-connector/tasks/0/offsets # 获取连接器任务配置 curl http://localhost:8083/connectors/jdbc-source-connector/tasks7.2 日志配置!-- log4j配置 -- Configuration Appenders Console PatternLayout pattern%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n/ /Console RollingFile FileNamelogs/connect.log/FileName PatternLayout Pattern%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n/Pattern /PatternLayout Policies TimeBasedTriggeringPolicy interval1/ SizeBasedTriggeringPolicy size100MB/ /Policies /RollingFile /Appenders Loggers Logger nameorg.apache.kafka.connect levelINFO/ Logger nameorg.apache.kafka.connect.runtime levelINFO/ Logger nameio.confluent.connect levelDEBUG/ /Loggers Root levelINFO AppenderRef refConsole/ AppenderRef refRollingFile/ /Root /Configuration最佳实践8.1 性能优化# 性能优化配置 # 增加worker并行度 tasks.max10 # 批处理配置 batch.size8192 linger.ms10 # 缓冲区配置 buffer.max.messages10000 buffer.memory67108864 # 线程配置 worker.background.threads8 producer.threads4 consumer.threads4 # 超时配置 request.timeout.ms30000 retry.backoff.ms10008.2 错误处理# 错误处理配置 # 容错配置 errors.toleranceall # 死信队列配置 errors.dead.letter.queue.topic.namedlq-topic errors.dead.letter.queue.topic.partitions5 errors.dead.letter.queue.topic.replication.factor3 # 重试配置 errors.retry.timeout300000 errors.retry.delay.max.ms60000 # 跳过错误 errors.skip.invalid.messagestrue总结Kafka Connect是构建数据管道的重要工具通过丰富的Connector生态系统和灵活的配置可以实现与各种数据系统的高效集成。本文详细介绍了Kafka Connect的架构、配置和使用方法包括Source Connector、Sink Connector、转换器配置以及REST API管理帮助开发者快速构建可靠的数据集成解决方案。