基础
本章覆盖 OnePath 最核心的四个能力点:自动发现、编码与序列化、拉取式订阅 与 一次性读写。它们是后续所有进阶示例的地基。运行模式与键表达式概念见 快速开始,公共 API 见 API 手册。
自动发现
通过组播 UDP 的自动发现机制,无需任何路由器即可发现邻近的 OnePath 节点(打印节点 ID、类型与链路列表),并可在 Peer 模式下直接发布或订阅。
关键 OnePath API
onepath_scout(cb, ctx, timeout_ms)— 扫描网络,每发现一个节点回调一次onepath_config_new/onepath_config_set_mode/onepath_config_destroy— 创建、配置(设为peer)、销毁会话配置onepath_open_with_config— 用配置打开会话onepath_declare_publisher/onepath_publisher_put_str— 声明发布者并发布字符串onepath_subscribe— 声明订阅者并注册回调;回调内onepath_sample_release释放样本
onepath_scout(on_hello, NULL, 3000); /* 扫描 3 秒,每发现一节点回调 */
onepath_config_t cfg;
onepath_config_new(&cfg);
onepath_config_set_mode(cfg, "peer");
onepath_session_t session;
onepath_open_with_config(&session, cfg);
onepath_config_destroy(cfg);
onepath_publisher_t pub;
onepath_declare_publisher(session, &pub, keyexpr, NULL);
onepath_publisher_put_str(pub, buf); /* pub 模式:发布字符串 */
/* sub 模式: */
onepath_subscribe(session, &sub, keyexpr, on_sample, NULL);./examples/build/release/full/onepath_discovery # 扫描网络
./examples/build/release/full/onepath_discovery sub # 订阅
./examples/build/release/full/onepath_discovery pub # 发布
./examples/build/release/full/onepath_discovery pub -k demo/test[2026-06-21-17-18-34:741] [INFO] scanning OnePath network (3s) ...
[2026-06-21-17-18-34:742] [INFO] discovered node ZID: 7dc578fcb8e285bb8f43b05c82759227, type: Peer
[2026-06-21-17-18-34:742] [INFO] locator[2]: tcp/100.64.0.1:46185
[2026-06-21-17-18-38:743] [INFO] scan complete.变体:双后端。无需路由器。多终端启动
pub与sub即可互发现。组播被禁用的虚拟机/容器需显式开启组播。
编码与序列化
两个相关示例:onepath_encoding_demo 演示编码类型系统(发布时标记数据格式,订阅方据此推断),onepath_bytes_demo 演示如何在 payload 缓冲区里手动布局自定义二进制数据。
编码类型(onepath_encoding_demo)
异构边缘设备无需猜测对端数据格式:发布时声明 encoding(JSON / 纯文本 / 二进制 / Protobuf),订阅方读取 sample->encoding 即可推断。同时演示预声明键表达式降低线上开销。
关键 OnePath API
ONEPATH_ENCODING_*编码常量(APPLICATION_JSON/TEXT_PLAIN/APPLICATION_OCTET_STREAM/APPLICATION_PROTOBUF)onepath_write_opts_t(ONEPATH_WRITE_OPTS_DEFAULT),其.encoding字段设置本次写入编码onepath_publisher_write(pub, data, len, &wopts)— 带写入选项发布二进制/文本数据- 订阅回调读取
sample->encoding后据此推断类型
onepath_write_opts_t wopts = ONEPATH_WRITE_OPTS_DEFAULT;
wopts.encoding = ONEPATH_ENCODING_APPLICATION_JSON;
onepath_publisher_write(pub, buf, strlen(buf), &wopts);
/* 订阅回调中按 encoding 推断类型: */
if (strcmp(sample->encoding, ONEPATH_ENCODING_APPLICATION_JSON) == 0)
type_hint = "JSON data";
onepath_sample_release(sample);./examples/build/release/full/onepath_encoding_demo local
./examples/build/release/full/onepath_encoding_demo sub &
./examples/build/release/full/onepath_encoding_demo pub[2026-06-21-17-19-03:490] [INFO] publish #0 JSON: {"seq": 0, "temp": 23.5, "unit": "celsius"}
[2026-06-21-17-19-03:490] [INFO] received key='demo/encoding/test' encoding='application/json' [JSON data]
[2026-06-21-17-19-05:491] [INFO] received key='demo/encoding/test' encoding='text/plain' [plain text]
[2026-06-21-17-19-07:491] [INFO] received key='demo/encoding/test' encoding='application/octet-stream' [binary data]变体:双后端。
手动序列化(onepath_bytes_demo)
纯本地示例(无需网络),演示如何把基本类型、长度前缀字符串、多字段结构化记录、变长数组编码进一段 payload 缓冲区——这正是承载在 OnePath payload 里的自定义二进制布局模式。
/* 长度前缀字符串:先写 4 字节长度,再写本体 */
uint32_t len32 = (uint32_t)strlen(original);
memcpy(buf + offset, &len32, 4); offset += 4;
memcpy(buf + offset, original, len32); offset += len32;
/* 反序列化:先读长度再读内容 */
uint32_t rlen; memcpy(&rlen, buf, 4);
memcpy(restored, buf + 4, rlen); restored[rlen] = '\0';./examples/build/release/full/onepath_bytes_demo[2026-06-21-17-18-17:211] [INFO] === demo 1: basic type serialization ===
serialize int32: 42 -> 4 bytes
[2026-06-21-17-18-17:211] [INFO] deserialize: 42 (match)
[2026-06-21-17-18-17:211] [INFO] === demo 2: structured data serialization (sensor data) ===
[1] Thermo-A: temp=23.5 C, hum=65.2%
[2026-06-21-17-18-17:211] [ OK ] all demos done.变体:双后端。纯本地逻辑,不依赖网络栈,tiny 输出与 full 一致。
拉取式订阅
基于环形缓冲(Ring Buffer)的非阻塞拉取式订阅:消费者按自身节奏 try_recv 拉取最新样本,旧数据被环形缓冲自动覆盖。适合数据突发、过时数据比无数据更危险的场景。
关键 OnePath API
onepath_subscribe_pull(session, &sub, &rx, keyexpr, ring_size)— 以环形缓冲通道声明拉取订阅者,返回接收句柄rxonepath_sample_try_recv(rx, &sample)— 非阻塞拉取一条样本(ONEPATH_OK表示命中)onepath_sample_release/onepath_sample_rx_destroy/onepath_subscriber_destroy— 资源释放
onepath_subscriber_t sub;
onepath_sample_rx_t rx;
onepath_subscribe_pull(session, &sub, &rx, keyexpr, (size_t)ring_size);
while (1) {
onepath_sleep_s(interval);
onepath_sample_t sample;
if (onepath_sample_try_recv(rx, &sample) == ONEPATH_OK) {
EP_OK("Pull #%d %s '%s'", pull_count, kind_to_str(sample.kind), sample.key);
onepath_sample_release(&sample);
while (onepath_sample_try_recv(rx, &sample) == ONEPATH_OK) /* 排空残留 */
onepath_sample_release(&sample);
}
}./examples/build/release/full/onepath_pull_sub --size 3 --interval 2
./examples/build/release/full/onepath_discovery pub # 另一终端配合发布[2026-06-21-17-19-57:110] [INFO] ring buffer size: 3, pull interval: 1 s
[2026-06-21-17-19-58:110] [INFO] Pull #1 no new data
[2026-06-21-17-20-04:111] [ OK ] Pull #7 PUT 'demo/discovery/hello': '[0] Hello from onepath_discovery pub!'
[2026-06-21-17-20-05:111] [ OK ] Pull #8 PUT 'demo/discovery/hello': '[1] Hello from onepath_discovery pub!'变体:双后端。
一次性读写 Put/Delete
无需声明 Publisher 的一次性写入/删除操作,适合命令行工具与低频写入场景。
关键 OnePath API
onepath_put_str(session, keyexpr, value)— 一次性写入字符串值(无需 publisher)onepath_delete(session, keyexpr)— 一次性删除某 keyonepath_config_*/onepath_open_with_config/onepath_close— 会话生命周期
onepath_config_new(&cfg);
onepath_config_set_mode(cfg, mode);
if (endpoint) onepath_config_add_endpoint(cfg, endpoint);
onepath_open_with_config(&session, cfg);
onepath_config_destroy(cfg);
if (strcmp(action, "put") == 0)
onepath_put_str(session, keyexpr, value); /* one-shot 写入 */
else
onepath_delete(session, keyexpr); /* one-shot 删除 */
onepath_close(session);./examples/build/release/full/onepath_put_delete put -k demo/example/hello -v "world"
./examples/build/release/full/onepath_put_delete delete -k demo/example/hello[2026-06-21-17-18-28:154] [INFO] opening OnePath session ...
[2026-06-21-17-18-28:157] [ OK ] session opened.
[2026-06-21-17-18-28:157] [INFO] PUT key: 'demo/example/hello', value: 'world'
[2026-06-21-17-18-28:157] [ OK ] put succeeded.变体:双后端。常配合 分布式存储 与 Get 查询 使用:用
put写入,再用onepath_get_demo查询验证。