.NET 项目要不要上 Kafka?看完这篇再决定
在典型的 .NET 微服务架构中同步 HTTP 调用链会引入直接耦合下游服务超时即阻塞上游失败重试可能导致雪崩。更隐蔽的是消息一旦被消费默认行为下 RabbitMQ 等队列会将其删除即使开启持久化也缺乏 Kafka 的 Offset 回溯能力——这意味着你无法在 Bug 修复后重放历史事件也无法在消费者故障时无损恢复。Kafka 通过持久化的提交日志Commit Log模型彻底解决了这两个问题消息写入即落盘消费者只移动游标Offset而不删除数据同时利用分区Partition和批量传输在合理硬件如 SSD、万兆网下可实现单机数十万级 TPS生产者写入。本文从 .NET 开发者的视角讲清它的核心机制、接入成本和选型边界不做过度简化也不堆砌无关细节。一、Kafka 是什么一句话Kafka 是一个分布式、高吞吐、能持久化存储消息的消息队列。它和 RabbitMQ 的主要区别RabbitMQ默认消费即删除即使开启持久化队列也缺乏 Kafka 的 Offset 回溯能力。适合可靠路由、任务分发。Kafka像银行流水账单——每笔交易都永久记在账本里消费者只移动指针随时可以重放。适合海量日志、历史回溯、流处理。Kafka 的核心原理可以忽略很多细节你只需要记住三个词概念通俗解释Topic主题类似数据库里的“表”消息按主题分类Partition分区Topic 被切成若干份可以并行读写所以快Consumer Group消费组多个消费者分担读取任务互不重复官网https://kafka.apache.orghttps://kafka.apache.org/GitHubhttps://github.com/apache/kafkahttps://github.com/apache/kafka开源协议Apache 2.0可免费商用。收费版是 Confluent 公司提供的企业增强版普通团队用开源版足够。二、怎么引入安装 NuGet 包dotnet add package Confluent.Kafka当前稳定版请以 NuGet 官网为准如 2.6.x 或更高支持 .NET 6/7/8。Program.cs 最小配置必须项 正确资源释放// Program.cs using Confluent.Kafka; var builder WebApplication.CreateBuilder(args); // 注册生产者Singleton 生命周期 builder.Services.AddSingletonIProducerNull, string(provider { var config new ProducerConfig { BootstrapServers localhost:9092, Acks Acks.All, // 等待所有副本确认防丢消息 EnableIdempotence true // 开启幂等防重复 }; return new ProducerBuilderNull, string(config).Build(); }); var app builder.Build(); // 应用退出前 Flush 并释放 Producer防止缓冲区消息丢失 var lifetime app.Services.GetRequiredServiceIHostApplicationLifetime(); lifetime.ApplicationStopping.Register(() { var producer app.Services.GetRequiredServiceIProducerNull, string(); producer.Flush(TimeSpan.FromSeconds(10)); producer.Dispose(); }); app.Run();验证接入是否成功using Confluent.Kafka; var config new ProducerConfig { BootstrapServers localhost:9092 }; using var producer new ProducerBuilderNull, string(config).Build(); var result await producer.ProduceAsync(test-topic, new MessageNull, string { Value Hello Kafka }); Console.WriteLine($Delivered to {result.TopicPartitionOffset}); 若输出类似 Delivered to test-topic[0]1 则成功。三、如何部署 Kafka简单版开发环境推荐 Docker快速、隔离、与生产一致正确配置KRaft 模式让宿主机可访问docker run -it --rm --name kafka \ -p 9092:9092 \ -e KAFKA_NODE_ID1 \ -e KAFKA_PROCESS_ROLESbroker,controller \ -e KAFKA_LISTENERSPLAINTEXT://:9092,CONTROLLER://:9093 \ -e KAFKA_ADVERTISED_LISTENERSPLAINTEXT://localhost:9092 \ -e KAFKA_CONTROLLER_LISTENER_NAMESCONTROLLER \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAPCONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ -e KAFKA_CONTROLLER_QUORUM_VOTERS1localhost:9093 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR1 \ apache/kafka:4.0.0或者使用更省心的 bitnami/kafka推荐新手端口说明9092Kafka 通信端口客户端连接用# 单容器模式localhost 指容器内部自身勿改为宿主机 IP docker run -it --rm --name kafka \ -p 9092:9092 \ -e KAFKA_CFG_NODE_ID0 \ -e KAFKA_CFG_PROCESS_ROLEScontroller,broker \ -e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS0localhost:9093 \ -e KAFKA_CFG_LISTENERSPLAINTEXT://:9092,CONTROLLER://:9093 \ -e KAFKA_CFG_ADVERTISED_LISTENERSPLAINTEXT://localhost:9092 \ -e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPCONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ bitnami/kafka:latestWindows 本地安装无 Docker 环境安装 Java必须Kafka 依赖 JVM下载地址Latest Releases | AdoptiumEclipse Temurin offers high-performance, cross-platform, open-source Java runtime binaries that are enterprise-ready and Java SE TCK-tested for general use in the Java ecosystem.https://adoptium.net/temurin/releases安装后命令行执行java -version验证。下载 Kafka下载地址https://kafka.apache.org/community/downloads/https://kafka.apache.org/downloads启动 KafkaKRaft 模式无需 ZooKeeper进入 Kafka 解压目录执行# 生成集群 ID .\bin\windows\kafka-storage.bat random-uuid # 格式化存储目录使用上面生成的 UUID .\bin\windows\kafka-storage.bat format -t UUID -c .\config\kraft\server.properties # 启动 Kafka .\bin\windows\kafka-server-start.bat .\config\kraft\server.properties验证新建命令行窗口进入 Kafka 目录执行.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092不报错即成功。生产环境提醒Windows 生产环境不推荐请使用 Linux 或 Docker/K8s。四、快速上手3 个示例10 分钟跑通前提本地已启动 Kafka参考上面的 Docker 命令示例1最小可用生产者消费者生产者发送消息using Confluent.Kafka; var config new ProducerConfig { BootstrapServers localhost:9092 }; using var producer new ProducerBuilderNull, string(config).Build(); for (int i 0; i 10; i) { var result await producer.ProduceAsync(demo-topic, new MessageNull, string { Value $Message {i} }); Console.WriteLine($Sent: {result.Value}); }消费者接收消息示例2ASP.NET Core 中集成生产者带 DI// Program.cs 注册见第二章 builder.Services.AddSingletonIProducerNull, string(provider { ... }); // 业务服务中使用 public class OrderService { private readonly IProducerNull, string _producer; public OrderService(IProducerNull, string producer) { _producer producer; } public async Task CreateOrderAsync(OrderDto order) { var json JsonSerializer.Serialize(order); await _producer.ProduceAsync(orders, new MessageNull, string { Value json }); } }示例3后台消费者服务IHostedService 重试死信处理public class KafkaConsumerService : BackgroundService { private readonly ILoggerKafkaConsumerService _logger; private IConsumerNull, string _consumer; private readonly int _maxRetries 3; public KafkaConsumerService(ILoggerKafkaConsumerService logger) { _logger logger; } public override Task StartAsync(CancellationToken cancellationToken) { var config new ConsumerConfig { BootstrapServers localhost:9092, GroupId order-group, AutoOffsetReset AutoOffsetReset.Latest, EnableAutoCommit false // 手动提交精确控制 }; _consumer new ConsumerBuilderNull, string(config).Build(); _consumer.Subscribe(orders); return base.StartAsync(cancellationToken); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { try { // Consume 是同步阻塞调用BackgroundService 的 ExecuteAsync // 运行在独立线程此处阻塞是预期行为不会影响主线程 var result _consumer.Consume(stoppingToken); bool success await ProcessWithRetry(result.Message.Value, stoppingToken); if (success) { _consumer.Commit(result); _logger.LogInformation(Processed and committed: {Value}, result.Message.Value); } else { // 超过重试次数记录死信手动提交跳过该消息或发送到死信 topic _logger.LogError(Failed after retries, skipping: {Value}, result.Message.Value); _consumer.Commit(result); } } catch (OperationCanceledException) { // 应用正常关闭停止消费循环不记录错误 break; } catch (ConsumeException ex) { _logger.LogError(ex, Consume error, will retry after delay); await Task.Delay(1000, stoppingToken); } } } private async Taskbool ProcessWithRetry(string message, CancellationToken token) { for (int i 0; i _maxRetries; i) { try { // 模拟业务处理 await Task.Delay(10, token); return true; } catch (Exception ex) { _logger.LogWarning(ex, Retry {Retry} for message, i 1); await Task.Delay(100 * (i 1), token); } } return false; } public override void Dispose() { _consumer?.Close(); _consumer?.Dispose(); base.Dispose(); } // 在 Program.cs 中注册 builder.Services.AddHostedServiceKafkaConsumerService(); }五、什么时候该用 Kafka✅ 适合用的场景日志 / 埋点 / 审计每天几 TB 数据需要存 7 天以上随时查。微服务事件驱动订单创建后需要通知库存、支付、物流等解耦且不怕下游故障。需要重放历史消息比如修复 Bug 后想重新处理昨天的一批消息。数据同步CDC数据库变化同步到缓存或数仓。❌ 不适合用的场景低延迟要求 10msKafka 正常生产环境的端到端延迟在 2–15ms 之间调优后可更低但仍高于 Redis 或纯内存队列。复杂消息路由比如根据不同头部分发到不同队列RabbitMQ 更擅长。小项目 / 小团队日均消息不到 10 万用 SQL Server 或 Redis 当队列更简单。六、实战里最常见的四个坑补充死信陷阱坑1分区设太多导致 rebalance 卡顿现象消费者频繁断开重连消费变慢甚至停止。解法普通场景 10–30 个分区就够了不要一上来就设 200。坑2生产者没配置acksall会丢消息现象Kafka 节点重启后少量消息消失。解法生产者加一行配置Acks Acks.All。坑3消费者处理太慢被踢出组现象日志出现LeaveGroup然后消费又重头开始。解法增加max.poll.interval.ms或把长任务异步化。坑4关闭自动提交后未处理失败消息导致无限重试★ 高频生产事故现象某条消息处理一直抛异常消费者不断重试同一条消息后续消息全部阻塞。原因EnableAutoCommit false且未 Commit异常后不提交 offset下次拉取仍是同一条。解法在示例3中实现了重试计数器 超过次数后手动 Commit 跳过或发送到死信 Topic。生产环境必须设计死信队列DLQ机制不可无限重试。七、选型结论直接抄作业一句话决策如果你的系统每天产生几百万条以上消息或者需要随时重放历史消息并且团队有人懂 Linux→ 上 Kafka。否则继续用 RabbitMQ 或数据库队列更省心。日均消息量需要历史回溯运维能力推荐 10 万否一般RabbitMQ / Redis 10 万是一般数据库存 JSON10 万 – 500 万是有 Linux 基础自建 3 节点 Kafka 500 万是有专职运维自建 Kafka 或云托管八、总结Kafka 没那么神秘——它就是一个能存能重放、特别能扛并发的消息队列。.NET 接入只需要装一个 NuGet 包、写几行代码。如果你正被高并发压垮、或者为丢消息和无法重放而头疼花半天用 Docker 跑个 Kafka 体验一下大概率会回不去。下一步行动使用上面正确的 Docker 命令启动 Kafka然后跑通生产者消费者示例特别注意消费者示例中的死信处理和优雅停机逻辑。评论区征集你在生产环境遇到过“消息无限重试”导致积压吗留言聊聊你的解决方案我会回复。如果觉得这篇修正版有用点赞、在看、转发给更多 .NET 朋友。少踩坑早下班。