Skip to content

纯托管轻量级RocketMQ客户端,支持发布消息、消费消息、负载均衡等核心功能!

License

Notifications You must be signed in to change notification settings

NewLifeX/NewLife.RocketMQ

Repository files navigation

NewLife.RocketMQ - 企业级纯托管 RocketMQ 客户端

GitHub top language GitHub License Nuget Downloads Nuget Nuget (with prereleases)

纯托管企业级 RocketMQ 客户端,支持 .NET Framework 4.5+ / .NET Standard 2.0+ / .NET Core / .NET 5+

完全使用 C# 实现,零外部依赖(无需 Java、gRPC、Protobuf 第三方库)


🎯 产品定位

NewLife.RocketMQ 是新生命团队开发的企业级纯托管 RocketMQ 客户端,专为 .NET 生态设计:

  • 双协议支持:同时支持 RocketMQ Remoting 协议(4.x)和 gRPC Proxy 协议(5.x)
  • 零外部依赖:纯 C# 实现,无需 Java 或 gRPC 运行时,内置轻量级 Protobuf 编解码器
  • 多云适配:统一接口适配阿里云、华为云、腾讯云及 Apache ACL 认证体系
  • 生产就绪:完整的消费重试、死信队列、事务回查、顺序消费等企业级特性
  • 高性能:VIP 通道、连接池、消息压缩、并发控制等性能优化手段

支持 Apache RocketMQ 4.0+ 和 5.x 版本


🚀 核心特性

生产者(Producer)

  • ✅ 同步/异步/单向发送
  • ✅ 批量消息发送
  • ✅ 延迟消息(18级定时 + 任意时间延迟)
  • ✅ 事务消息(半消息 + 事务回查)
  • ✅ 顺序消息
  • ✅ Request-Reply 模式
  • ✅ 消息压缩(ZLIB)
  • ✅ 消息轨迹追踪

消费者(Consumer)

  • ✅ Pull 模式消费
  • ✅ 集群消费 / 广播消费
  • ✅ 消费者负载均衡(Rebalance)
  • ✅ Tag 过滤 / SQL92 表达式过滤
  • ✅ 多 Topic 订阅
  • ✅ 消费重试 + 死信队列(DLQ)
  • ✅ 顺序消费(队列锁定)
  • ✅ Pop 消费模式
  • ✅ 消费限流(并发控制)
  • ✅ 按时间戳消费

管理功能

  • ✅ Topic 创建/更新/删除
  • ✅ 消费组创建/更新/删除
  • ✅ 消息查询(按 ID / 按 Key)
  • ✅ 消费统计查询
  • ✅ 集群信息查询
  • ✅ 偏移量管理与重置

云厂商适配

  • ✅ 阿里云消息队列 RocketMQ(实例 ID 路由 + HTTP NameServer 发现)
  • ✅ 华为云 DMS for RocketMQ(SSL/TLS + 实例 ID 路由)
  • ✅ 腾讯云 TDMQ RocketMQ(Namespace 前缀路由)
  • ✅ Apache RocketMQ ACL 认证(HMAC-SHA1 签名)

协议支持

  • Remoting 协议(RocketMQ 4.x/5.x Broker):成熟稳定,功能完整
  • gRPC Proxy 协议(RocketMQ 5.x Proxy):内置 Protobuf 编解码器,支持任意时间延迟消息

📦 安装

NuGet 包管理器

Install-Package NewLife.RocketMQ

.NET CLI

dotnet add package NewLife.RocketMQ

PackageReference

<PackageReference Include="NewLife.RocketMQ" Version="3.0.*" />

🔧 快速开始

1. 生产者发送消息

using NewLife.RocketMQ;

var producer = new Producer
{
    Topic = "test_topic",
    NameServerAddress = "127.0.0.1:9876",
    Group = "producer_group"
};

producer.Start();

// 同步发送
var result = producer.Publish("Hello RocketMQ!");
Console.WriteLine($"消息ID: {result.MsgId}");

// 异步发送
await producer.PublishAsync("异步消息");

// 批量发送
await producer.PublishBatch(new[] { "消息1", "消息2", "消息3" });

2. 消费者接收消息

using NewLife.RocketMQ;

var consumer = new Consumer
{
    Topic = "test_topic",
    Group = "consumer_group",
    NameServerAddress = "127.0.0.1:9876"
};

consumer.OnConsume = (q, messages) =>
{
    foreach (var msg in messages)
    {
        Console.WriteLine($"收到消息: {msg.BodyString}");
    }
    return true; // 返回 true 表示消费成功
};

consumer.Start();
Console.WriteLine("按任意键退出...");
Console.ReadKey();

3. 延迟消息

// 使用预设延迟等级(18级)
producer.PublishDelay("延迟消息", DelayTimeLevels.s30);

// gRPC 模式支持任意时间延迟
producer.GrpcProxyAddress = "http://127.0.0.1:8081";
await producer.PublishDelayViaGrpcAsync("任意延迟", DateTime.Now.AddMinutes(30));

4. 事务消息

var producer = new Producer
{
    Topic = "tx_topic",
    Group = "tx_group",
    NameServerAddress = "127.0.0.1:9876"
};

// 设置事务回查回调
producer.OnCheckTransaction = (msg, transactionId) =>
{
    // 根据 transactionId 查询本地事务状态
    var success = CheckLocalTransaction(transactionId);
    return success ? TransactionState.Commit : TransactionState.Rollback;
};

producer.Start();

// 发送半消息
var sendResult = producer.PublishTransaction("订单创建");

try
{
    // 执行本地事务
    ExecuteLocalTransaction(sendResult.TransactionId);
    
    // 提交事务
    producer.EndTransaction(sendResult, TransactionState.Commit);
}
catch
{
    // 回滚事务
    producer.EndTransaction(sendResult, TransactionState.Rollback);
}

5. Request-Reply 模式

生产者端:

var producer = new Producer
{
    Topic = "request_topic",
    NameServerAddress = "127.0.0.1:9876"
};
producer.Start();

// 同步请求(等待响应)
var response = producer.Request("请求消息", timeout: 5000);
Console.WriteLine($"收到响应: {response.BodyString}");

// 异步请求
var reply = await producer.RequestAsync("异步请求", timeout: 5000);

消费者端:

var consumer = new Consumer
{
    Topic = "request_topic",
    Group = "request_group",
    NameServerAddress = "127.0.0.1:9876"
};

consumer.OnConsume = (q, messages) =>
{
    foreach (var msg in messages)
    {
        if (!String.IsNullOrEmpty(msg.CorrelationId))
        {
            // 处理请求并发送响应
            var result = ProcessRequest(msg.BodyString);
            consumer.SendReply(msg, result);
        }
    }
    return true;
};

consumer.Start();

6. 顺序消息

// 发送顺序消息(相同 key 的消息进入同一队列)
var queue = producer.SelectQueue("order_123");
producer.Publish("顺序消息1", queue);
producer.Publish("顺序消息2", queue);

// 消费端启用顺序消费
var consumer = new Consumer
{
    Topic = "test_topic",
    Group = "consumer_group",
    NameServerAddress = "127.0.0.1:9876",
    OrderConsume = true  // 启用顺序消费
};

🌩️ 云厂商接入

阿里云消息队列 RocketMQ

using NewLife.RocketMQ;

var producer = new Producer
{
    Topic = "test_topic",
    NameServerAddress = "http://MQ_INST_xxx.aliyuncs.com:80",
    CloudProvider = new AliyunProvider
    {
        AccessKey = "你的AccessKey",
        SecretKey = "你的SecretKey",
        InstanceId = "MQ_INST_xxx"  // 可选,自动从地址解析
    }
};
producer.Start();

华为云 DMS for RocketMQ

var producer = new Producer
{
    Topic = "test_topic",
    NameServerAddress = "华为云实例地址:9876",
    CloudProvider = new HuaweiProvider
    {
        AccessKey = "你的AK",
        SecretKey = "你的SK",
        InstanceId = "实例ID",
        EnableSsl = true  // 启用SSL加密
    }
};
producer.Start();

腾讯云 TDMQ RocketMQ

var producer = new Producer
{
    Topic = "test_topic",
    NameServerAddress = "腾讯云实例地址:9876",
    CloudProvider = new TencentProvider
    {
        AccessKey = "腾讯云SecretId",
        SecretKey = "腾讯云SecretKey",
        Namespace = "命名空间"
    }
};
producer.Start();

Apache RocketMQ ACL 认证

var producer = new Producer
{
    Topic = "test_topic",
    NameServerAddress = "127.0.0.1:9876",
    CloudProvider = new AclProvider
    {
        AccessKey = "RocketMQ AccessKey",
        SecretKey = "RocketMQ SecretKey"
    }
};
producer.Start();

🎓 进阶使用

消费重试与死信队列

var consumer = new Consumer
{
    Topic = "test_topic",
    Group = "consumer_group",
    NameServerAddress = "127.0.0.1:9876",
    EnableRetry = true,           // 启用消费重试
    MaxReconsumeTimes = 3         // 最大重试次数
};

consumer.OnConsume = (q, messages) =>
{
    foreach (var msg in messages)
    {
        try
        {
            ProcessMessage(msg);
        }
        catch
        {
            // 返回 false 触发重试
            return false;
        }
    }
    return true;
};

// 超过最大重试次数后,消息自动进入死信队列:%DLQ%{ConsumerGroup}

Tag 过滤

// 生产者发送带 Tag 的消息
producer.Publish("消息内容", tag: "TagA");

// 消费者订阅指定 Tag
var consumer = new Consumer
{
    Topic = "test_topic",
    Group = "consumer_group",
    Tags = "TagA || TagB",  // 订阅 TagA 或 TagB
    NameServerAddress = "127.0.0.1:9876"
};

SQL92 表达式过滤

// 生产者发送带自定义属性的消息
var msg = new Message
{
    BodyString = "消息内容",
    ["age"] = "25",
    ["city"] = "Shanghai"
};
producer.Publish(msg);

// 消费者使用 SQL92 表达式过滤
var consumer = new Consumer
{
    Topic = "test_topic",
    Group = "consumer_group",
    ExpressionType = "SQL92",
    Subscription = "age > 18 AND city = 'Shanghai'",
    NameServerAddress = "127.0.0.1:9876"
};

多 Topic 订阅

var consumer = new Consumer
{
    Topics = "topic1;topic2;topic3",  // 订阅多个 Topic
    Group = "consumer_group",
    NameServerAddress = "127.0.0.1:9876"
};

consumer.OnConsume = (q, messages) =>
{
    foreach (var msg in messages)
    {
        Console.WriteLine($"Topic: {msg.Topic}, 消息: {msg.BodyString}");
    }
    return true;
};

Pop 消费模式

var consumer = new Consumer
{
    Topic = "test_topic",
    Group = "consumer_group",
    NameServerAddress = "127.0.0.1:9876"
};

consumer.Start();

// Pop 消费(不自动提交偏移)
var messages = await consumer.PopMessageAsync(timeout: 10000);
foreach (var msg in messages)
{
    try
    {
        ProcessMessage(msg);
        // 手动确认
        await consumer.AckMessageAsync(msg);
    }
    catch
    {
        // 修改不可见时间(延长处理时间)
        await consumer.ChangeInvisibleTimeAsync(msg, 30000);
    }
}

VIP 通道

var producer = new Producer
{
    Topic = "test_topic",
    NameServerAddress = "127.0.0.1:9876",
    VipChannelEnabled = true  // 启用 VIP 通道(使用 BrokerPort - 2)
};

消息压缩

var producer = new Producer
{
    Topic = "test_topic",
    NameServerAddress = "127.0.0.1:9876",
    CompressOverBytes = 4096  // 消息体超过 4KB 自动 ZLIB 压缩
};

消费限流

var consumer = new Consumer
{
    Topic = "test_topic",
    Group = "consumer_group",
    NameServerAddress = "127.0.0.1:9876",
    MaxConcurrentConsume = 10  // 最多同时处理 10 条消息
};

📚 文档


🏗️ 架构特点

双协议架构

MqBase (业务基类)
├── Producer (生产者)
└── Consumer (消费者)

通信层
├── Remoting 协议(4.x/5.x Broker)
│   ├── ClusterClient (TCP 长连接)
│   ├── NameClient (路由发现)
│   └── BrokerClient (心跳/注销)
│
└── gRPC 协议(5.x Proxy,netstandard2.1+)
    ├── GrpcClient (HTTP/2 客户端)
    ├── GrpcMessagingService (11 个 RPC 方法)
    └── ProtoWriter/ProtoReader (自研 Protobuf 编解码器)

云厂商适配层

ICloudProvider (统一云厂商接口)
├── AliyunProvider (阿里云)
├── HuaweiProvider (华为云)
├── TencentProvider (腾讯云)
└── AclProvider (Apache ACL)

🔬 测试覆盖

30+ 测试类覆盖核心功能:

  • 核心功能:ProducerTests、ConsumerTests、CommandTests、MessageTests
  • 高级特性:TransactionCheckTests、BatchMessageTests、RetryTests、OrderConsumeTests
  • 协议兼容:IPv6Tests、MessageId5xTests、MQVersionTests、ProtoTests
  • 云厂商:AliyunTests、CloudProviderTests
  • 性能优化:CompressionTests、ConcurrentConsumeTests、VipChannelTests

📊 性能优势

特性 NewLife.RocketMQ 官方 Java 客户端
部署复杂度 ✅ 单一 DLL,零依赖 ⚠️ 需要 JRE 环境
跨平台 ✅ .NET Framework 4.5+ ~ .NET 10 ✅ 需要对应平台 JRE
功能完整度 ✅ 4.x 核心功能 100%,5.x 90% ✅ 100%
性能 ✅ 高性能(.NET 原生优化) ✅ 高性能
gRPC 支持 ✅ 自研编解码器,零依赖 ⚠️ 依赖 gRPC 库

🤝 参与贡献

欢迎提交 Issue 和 Pull Request!

  1. Fork 本仓库
  2. 创建特性分支 (git checkout -b feature/AmazingFeature)
  3. 提交更改 (git commit -m 'Add some AmazingFeature')
  4. 推送到分支 (git push origin feature/AmazingFeature)
  5. 提交 Pull Request

📄 许可协议

本项目采用 MIT License 开源协议。


新生命项目矩阵

各项目默认支持 net10.0/net9.0/netstandard2.1/netstandard2.0/net4.62/net4.5

项目 年份 说明
基础组件 支撑其它中间件以及产品项目
NewLife.Core 2002 核心库,日志、配置、缓存、网络、序列化、APM性能追踪
NewLife.XCode 2005 大数据中间件,单表百亿级,MySql/SQLite/SqlServer/Oracle/PostgreSql/达梦,自动分表,读写分离
NewLife.Net 2005 网络库,单机千万级吞吐率(2266万tps),单机百万级连接(400万Tcp长连接)
NewLife.Remoting 2011 协议通信库,提供CS应用通信框架,支持Http/RPC通信框架,高吞吐,物联网设备低开销易接入
NewLife.Cube 2010 魔方快速开发平台,集成了用户权限、SSO登录、OAuth服务端等,单表100亿级项目验证
NewLife.Agent 2008 服务管理组件,把应用安装成为操作系统守护进程,Windows服务、Linux的Systemd
NewLife.Zero 2020 Zero零代脚手架,基于NewLife组件生态的项目模板NewLife.Templates,Web、WebApi、Service
中间件 对接知名中间件平台
NewLife.Redis 2017 Redis客户端,微秒级延迟,百万级吞吐,丰富的消息队列,百亿级数据量项目验证
NewLife.RocketMQ 2018 RocketMQ纯托管客户端,支持Apache RocketMQ和阿里云消息队列,十亿级项目验证
NewLife.MQTT 2019 物联网消息协议,MqttClient/MqttServer,客户端支持阿里云物联网
NewLife.IoT 2022 IoT标准库,定义物联网领域的各种通信协议标准规范
NewLife.Modbus 2022 ModbusTcp/ModbusRTU/ModbusASCII,基于IoT标准库实现,支持ZeroIoT平台和IoTEdge网关
NewLife.Siemens 2022 西门子PLC协议,基于IoT标准库实现,支持IoT平台和IoTEdge
NewLife.Map 2022 地图组件库,封装百度地图、高德地图、腾讯地图、天地图
NewLife.Audio 2023 音频编解码库,PCM/ADPCMA/G711A/G722U/WAV/AAC
产品平台 产品平台级,编译部署即用,个性化自定义
Stardust 2018 星尘,分布式服务平台,节点管理、APM监控中心、配置中心、注册中心、发布中心
AntJob 2019 蚂蚁调度,分布式大数据计算平台(实时/离线),蚂蚁搬家分片思想,万亿级数据量项目验证
NewLife.ERP 2021 企业ERP,产品管理、客户管理、销售管理、供应商管理
CrazyCoder 2006 码神工具,众多开发者工具,网络、串口、加解密、正则表达式、Modbus、MQTT
EasyIO 2023 简易文件存储,支持分布式系统中文件集中存储
XProxy 2005 产品级反向代理,NAT代理、Http代理
HttpMeter 2022 Http压力测试工具
GitCandy 2015 Git源代码管理系统
SmartOS 2014 嵌入式操作系统,完全独立自主,支持ARM Cortex-M芯片架构
SmartA2 2019 嵌入式工业计算机,物联网边缘网关,高性能.NET8主机,应用于工业、农业、交通、医疗
FIoT物联网平台 2020 物联网整体解决方案,建筑、环保、农业,软硬件及大数据分析一体化,单机十万级点位项目验证
UWB高精度室内定位 2020 厘米级(10~20cm)高精度室内定位,软硬件一体化,与其它系统联动,大型展厅项目验证

新生命开发团队

XCode

新生命团队(NewLife)成立于2002年,是新时代物联网行业解决方案提供者,致力于提供软硬件应用方案咨询、系统架构规划与开发服务。

团队主导的80多个开源项目已被广泛应用于各行业,Nuget累计下载量高达400余万次

团队开发的大数据中间件 NewLife.XCode、蚂蚁调度计算平台 AntJob、星尘分布式平台 Stardust、缓存队列组件 NewLife.Redis 以及物联网平台 FIoT,均成功应用于电力、高校、互联网、电信、交通、物流、工控、医疗、文博等行业,为客户提供了大量先进、可靠、安全、高质量、易扩展的产品和系统集成服务。

我们将不断通过服务的持续改进,成为客户长期信赖的合作伙伴,通过不断的创新和发展,成为国内优秀的 IoT 服务供应商。

新生命团队始于2002年,部分开源项目具有20年以上漫长历史,源码库保留有2010年以来所有修改记录

智能大石头

About

纯托管轻量级RocketMQ客户端,支持发布消息、消费消息、负载均衡等核心功能!

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors 12

Languages