OpenMessaging领域架构V0.3-中文版
OpenMessaging 领域架构 V0.3
原文链接: https://openmessaging.cloud/design/2018/03/28/openmessaging-domain-architecture-v0.3/
来源: OpenMessaging 官方网站 · 博客 · 设计 栏目
作者: Yukon(OpenMessaging 项目核心成员)
日期: 2018-03-28
许可: Linux 基金会协作项目
关联资源: Java API 文档 · 规范仓库
领域架构总览
以下是 OpenMessaging V0.3 版本的领域架构设计,详见 JavaDoc。
与第一版相比,Topic(主题)概念已从模型中移除。本版本引入了 Stream(流) 元素,它是分区(partition)、分片(shard)、消息组(message group)等概念的抽象表示。
以下章节逐一说明 OMS(开放消息规范,Open Messaging Specification)中引入的这些核心概念。
命名空间(Namespace)
命名空间类似于 cgroup 中的 namespace,用于创建具有安全保障的隔离空间。每个命名空间拥有自己独立的生产者、消费者、主题、队列等一整套资源。
OpenMessaging 通过 MessagingAccessPoint(消息访问入口) 来访问、读取和写入指定命名空间中的资源。
┌─────────────────────────────────────────┐
│ 命名空间 A │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 生产者 │ │ 消费者 │ │ 队列 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
│ 命名空间 B │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 生产者 │ │ 消费者 │ │ 队列 │ │
│ └─────────┘ └─────────┘ └─────────┘ │
└─────────────────────────────────────────┘
关键要点:
- 多租户隔离的基础设施
- 通过
OMS.getMessagingAccessPoint(url)获取入口 - URL 格式:
oms:<驱动类型>://[账号@]主机[:端口]/<区域>
生产者(Producer)
OpenMessaging 定义了两种生产者:Producer(标准生产者) 和 BatchMessageSender(批量消息发送器)。
标准生产者
提供多种 send 方法将消息发送到指定目标(在 OMS 中目标是 Queue):
| 发送方式 | 方法 | 行为描述 |
|---|---|---|
| 同步发送 | send(Message) |
调用线程阻塞等待,直到发送请求完成 |
| 异步发送 | sendAsync(Message) |
立即返回 Future<SendResult>,调用线程不阻塞 |
| 单向发送 | sendOneway(Message) |
不关心发送结果,不抛出异常 |
| 事务发送 | send(Message, LocalTransactionExecutor, KeyValue) |
仅当本地事务提交后,消息才对消费者可见 |
批量消息发送器
专注于吞吐速度,实现可采用批量方式——先累积多条消息,然后一次性提交(commit):
BatchMessageSender sender = producer.createBatchMessageSender();
sender.send(msg1).send(msg2).send(msg3);
sender.commit(); // 一次性提交所有消息
// 或者 sender.rollback(); // 回滚所有未提交消息
消费者(Consumer)
OpenMessaging 定义了三种消费者:PullConsumer(拉模式消费者)、PushConsumer(推模式消费者) 和 StreamingConsumer(流式消费者)。每种消费者仅支持从 Queue 消费消息。
拉模式消费者(PullConsumer)
- 从指定队列主动拉取消息
- 支持通过确认机制(acknowledgement)随时提交消费结果
- 一个拉模式消费者只能从一个固定队列拉取消息
PullConsumer consumer = messagingAccessPoint.createPullConsumer();
consumer.attachQueue("NS://HELLO_QUEUE");
Message msg = consumer.receive(); // 阻塞式拉取
consumer.ack(msg.sysHeaders().getString(Message.BuiltinKeys.RECEIPT_HANDLE));
推模式消费者(PushConsumer)
- 从多个队列接收消息
- 消息由 MOM(面向消息的中间件)服务端推送到客户端
- 可以关联(attach)到多个队列,每个队列使用独立的
MessageListener(消息监听器) - 通过
ReceivedMessageContext(已接收消息上下文)随时提交消费结果
PushConsumer consumer = messagingAccessPoint.createPushConsumer();
consumer.attachQueue("NS://QUEUE_A", new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
// 处理接收到的消息
context.ack(); // 确认消费
}
});
流式消费者(StreamingConsumer)
- 全新的消费者类型——面向流的消费者
- 旨在将消息系统与流式计算、大数据平台轻松集成
- 以迭代器方式从指定队列的 Stream(流)中消费消息
StreamingConsumer consumer = messagingAccessPoint.createStreamingConsumer();
StreamingIterator iterator = consumer.seekToBeginning("stream-0");
while (iterator.hasNext()) {
Message msg = iterator.next();
// 逐条处理消息
}
队列(Queue)
Queue 是 OMS 中最基础、最核心的概念。 Queue 的消息来源可以是 Producer(生产者)或 Routing(路由)。

Stream 分区机制
需要特别注意的是,一个 Queue 可以被划分为多个 Stream(流),消息将通过 MessageHeader#STREAM_KEY(消息头中的流键)分发到指定的 Stream。
┌──────────────┐
│ 队列 │
├──────────────┤
│ 流 0 │ ← STREAM_KEY = "key_a"
│ 流 1 │ ← STREAM_KEY = "key_b"
│ 流 2 │ ← STREAM_KEY = "key_c"
│ ... │
└──────────────┘
Stream 是一个抽象概念,在不同消息系统中可以分别对应:
- Kafka: Partition(分区)
- RocketMQ: MessageQueue(消息队列)
- RabbitMQ: Shard(分片)
直接投递路径
Queue 也可以直接从 Producer 接收消息——在某些场景下,我们希望 Producer 和 Consumer 之间走最短路径,以获得最佳性能。
生产者 ──→ 队列 ──→ 消费者 (最短路径,无需经过路由)
路由(Routing)
路由负责处理源队列中的原始消息,并将其转发到另一个队列。
三元组模型
每个路由由三个要素组成:源队列 → 表达式管道 → 目标队列。
┌──────────┐ ┌────────────┐ ┌──────────┐
│ 源队列 │ ──→ │ 表达式管道 │ ──→ │ 目标队列 │
│ │ │ │ │ │
└──────────┘ └────────────┘ └──────────┘
表达式类型
表达式(Expression)用于处理路由中流动的消息。目前此 OMS 版本定义了以下表达式类型:
| 表达式类型 | 功能说明 | 状态 |
|---|---|---|
| 过滤器表达式(Filter) | 按条件过滤消息(最常用) | ✅ 已定义 |
| 去重器表达式(Deduplicator) | 去除重复消息 | 🔮 未来支持 |
| 连接器表达式(Joiner) | 连接/合并多条消息 | 🔮 未来支持 |
| RPC 表达式 | 远程过程调用 | 🔮 未来支持 |
跨网络路由
路由可以跨越网络边界——消息可以从一个网络分区路由到另一个网络分区,实现跨数据中心、跨地域的消息流转。
整体数据流
与 JMS 的核心差异
| 对比维度 | JMS(Java 消息服务) | OpenMessaging(OMS) |
|---|---|---|
| 编程语言 | 仅限 Java | 语言无关 |
| 消息模型 | Queue + Topic(双模型) | Queue(统一模型) |
| 流式处理 | ❌ 不支持 | ✅ StreamingConsumer |
| 多租户 | ❌ 无原生支持 | ✅ 命名空间隔离 |
| 路由机制 | ❌ 无 | ✅ Routing + Expression |
| 负载均衡 | 无明确规范 | 内置规范定义 |
| 云原生 | 不支持 | ☁️ 面向云设计 |
参考资源
| 资源名称 | 链接 |
|---|---|
| 🏠 OpenMessaging 官网 | openmessaging.cloud |
| 📋 正式规范仓库 | github.com/openmessaging/specification |
| 📚 Java API 完整文档 | openmessaging.cloud/openmessaging-java |
| 📊 基准测试框架 | Benchmark Framework |
| 💻 Java API 源码 | github.com/openmessaging/openmessaging-java |
| 💬 社区 Slack | openmessaging.herokuapp.com |
| 📧 邮件列表 | groups.google.com/forum/#!forum/openmessaging |
说明: 本文是 OpenMessaging 项目的官方领域架构设计文档。OpenMessaging 是 Linux 基金会协作项目,由阿里巴巴发起,联合 Yahoo、滴滴、微众银行等多家企业共同制定,旨在建立云原生、厂商无关的分布式消息开放标准。SOFAMQ 的 OMS API 正是对此规范的具体实现。
forum/openmessaging](https://groups.google.com/forum/#!forum/openmessaging) |
说明: 本文是 OpenMessaging 项目的官方领域架构设计文档。OpenMessaging 是 Linux 基金会协作项目,由阿里巴巴发起,联合 Yahoo、滴滴、微众银行等多家企业共同制定,旨在建立云原生、厂商无关的分布式消息开放标准。SOFAMQ 的 OMS API 正是对此规范的具体实现。
更多推荐
所有评论(0)