kafka作用:Kafka 是一个分布式流处理平台,主要用于构建实时数据管道和流式应用程序。负责接收、缓冲和分发传感器数据,使您的系统能够可靠地处理海量的实时数据流。1. 消息队列/消息总线 在系统间可靠地传递消息 解耦生产者和消费者 缓冲数据,平衡系统负载 2. 数据管道 实时传输数据(如日志、指标、事件) 连接不同系统(数据库 → 数据仓库 → 分析系统) 3. 流处理 实时处理和分析数据流 支持复杂的流处理操作(过滤、转换、聚合)Kafka 的关键特性1. 高吞吐量 每秒处理数百万条消息 适合海量传感器数据 2. 持久化存储 // Kafka 将消息持久化到磁盘 // 可以配置保留策略(如保留7天) server.properties: log.retention.hours=168 3. 分布式和高可用 多个 Broker 组成集群 数据分区和副本机制 单点故障不影响整体服务 优势: 处理海量数据 - 传感器通常产生大量数据 保证数据不丢失 - 持久化存储,支持重放 支持回溯 - 可以重新消费历史数据 易于水平扩展 - 增加Broker即可扩展 生态完善 - 连接器丰富(连接各种数据源)kafka官网下载包地址: https://kafka.apache.org/downloads版本含义:kafka_2.12-3.8.0 2.12表示Scala版本。 3.8.0 表示Kafka版本Scala 2.12指的是编译Kafka源代码的Scala编译器版本号部署:一,上传kafka安装包,解压到目录tar xf xxxx.tar -C 指定目录二,修改kafka的文件 config/server.properties1,修改内容为:broker.id=0 listeners=PLAINTEXT://:9092 log.dirs=/tmp/kafka-logs zookeeper.connect=localhost:2181 #### broker.id: 每个 Kafka broker 的唯一标识。 listeners: 配置 Kafka 监听的地址(例如 PLAINTEXT://localhost:9092)。 log.dirs: 存储 Kafka 日志的目录 zookeeper.connect=localhost:2181: zookeeper,IP端口 Kafka 依赖 ZooKeeper 来管理集群。Kafka 自带 ZooKeeper,可以使用它自带的配置。 可以在 config/zookeeper.properties 中进行配置,但默认配置通常已经足够。broker.id=0 # 每个 Kafka Broker 必须拥有一个唯一的标识符 delete.topic.enable=true listeners=SASL_PLAINTEXT://10.238.22.126:9092 advertised.listeners=SASL_PLAINTEXT://10.238.22.126:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/app/eoptest/kafka_2.12-0.11.0.3/kafkaLogs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 #The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connection.timeout.ms=6000 zookeeper.connect=10.238.22.126:2181 group.initial.rebalance.delay.ms=0 security.inter.broker.protocol=SASL_PLAINTEXT #kafka安全协议 sasl.enabled.mechanisms=PLAIN #加密方式 sasl.mechanism.inter.broker.protocol=PLAIN #认证策略 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin;User:alice三,启动ZooKeeperZooKeeper 是 分布式协调服务,主要用于管理和协调分布式系统中的各个节点bin/zookeeper-server-start.sh -daemon config/zookeeper.properties -daemon #后台启动kafka