Skip to content

基础

本章覆盖 OnePath 最核心的四个能力点:自动发现编码与序列化拉取式订阅一次性读写。它们是后续所有进阶示例的地基。运行模式与键表达式概念见 快速开始,公共 API 见 API 手册

自动发现

通过组播 UDP 的自动发现机制,无需任何路由器即可发现邻近的 OnePath 节点(打印节点 ID、类型与链路列表),并可在 Peer 模式下直接发布或订阅。

关键 OnePath API

  • onepath_scout(cb, ctx, timeout_ms) — 扫描网络,每发现一个节点回调一次
  • onepath_config_new / onepath_config_set_mode / onepath_config_destroy — 创建、配置(设为 peer)、销毁会话配置
  • onepath_open_with_config — 用配置打开会话
  • onepath_declare_publisher / onepath_publisher_put_str — 声明发布者并发布字符串
  • onepath_subscribe — 声明订阅者并注册回调;回调内 onepath_sample_release 释放样本
c
onepath_scout(on_hello, NULL, 3000);              /* 扫描 3 秒,每发现一节点回调 */

onepath_config_t cfg;
onepath_config_new(&cfg);
onepath_config_set_mode(cfg, "peer");
onepath_session_t session;
onepath_open_with_config(&session, cfg);
onepath_config_destroy(cfg);

onepath_publisher_t pub;
onepath_declare_publisher(session, &pub, keyexpr, NULL);
onepath_publisher_put_str(pub, buf);              /* pub 模式:发布字符串 */
/* sub 模式: */
onepath_subscribe(session, &sub, keyexpr, on_sample, NULL);
bash
./examples/build/release/full/onepath_discovery            # 扫描网络
./examples/build/release/full/onepath_discovery sub        # 订阅
./examples/build/release/full/onepath_discovery pub        # 发布
./examples/build/release/full/onepath_discovery pub -k demo/test
text
[2026-06-21-17-18-34:741] [INFO] scanning OnePath network (3s) ...
[2026-06-21-17-18-34:742] [INFO] discovered node ZID: 7dc578fcb8e285bb8f43b05c82759227, type: Peer
[2026-06-21-17-18-34:742] [INFO]   locator[2]: tcp/100.64.0.1:46185
[2026-06-21-17-18-38:743] [INFO] scan complete.

变体:双后端。无需路由器。多终端启动 pubsub 即可互发现。组播被禁用的虚拟机/容器需显式开启组播。

编码与序列化

两个相关示例:onepath_encoding_demo 演示编码类型系统(发布时标记数据格式,订阅方据此推断),onepath_bytes_demo 演示如何在 payload 缓冲区里手动布局自定义二进制数据

编码类型(onepath_encoding_demo

异构边缘设备无需猜测对端数据格式:发布时声明 encoding(JSON / 纯文本 / 二进制 / Protobuf),订阅方读取 sample->encoding 即可推断。同时演示预声明键表达式降低线上开销。

关键 OnePath API

  • ONEPATH_ENCODING_* 编码常量(APPLICATION_JSON / TEXT_PLAIN / APPLICATION_OCTET_STREAM / APPLICATION_PROTOBUF
  • onepath_write_opts_tONEPATH_WRITE_OPTS_DEFAULT),其 .encoding 字段设置本次写入编码
  • onepath_publisher_write(pub, data, len, &wopts) — 带写入选项发布二进制/文本数据
  • 订阅回调读取 sample->encoding 后据此推断类型
c
onepath_write_opts_t wopts = ONEPATH_WRITE_OPTS_DEFAULT;
wopts.encoding = ONEPATH_ENCODING_APPLICATION_JSON;
onepath_publisher_write(pub, buf, strlen(buf), &wopts);

/* 订阅回调中按 encoding 推断类型: */
if (strcmp(sample->encoding, ONEPATH_ENCODING_APPLICATION_JSON) == 0)
    type_hint = "JSON data";
onepath_sample_release(sample);
bash
./examples/build/release/full/onepath_encoding_demo local
./examples/build/release/full/onepath_encoding_demo sub &
./examples/build/release/full/onepath_encoding_demo pub
text
[2026-06-21-17-19-03:490] [INFO] publish #0 JSON: {"seq": 0, "temp": 23.5, "unit": "celsius"}
[2026-06-21-17-19-03:490] [INFO] received key='demo/encoding/test' encoding='application/json' [JSON data]
[2026-06-21-17-19-05:491] [INFO] received key='demo/encoding/test' encoding='text/plain' [plain text]
[2026-06-21-17-19-07:491] [INFO] received key='demo/encoding/test' encoding='application/octet-stream' [binary data]

变体:双后端

手动序列化(onepath_bytes_demo

纯本地示例(无需网络),演示如何把基本类型、长度前缀字符串、多字段结构化记录、变长数组编码进一段 payload 缓冲区——这正是承载在 OnePath payload 里的自定义二进制布局模式。

c
/* 长度前缀字符串:先写 4 字节长度,再写本体 */
uint32_t len32 = (uint32_t)strlen(original);
memcpy(buf + offset, &len32, 4); offset += 4;
memcpy(buf + offset, original, len32); offset += len32;
/* 反序列化:先读长度再读内容 */
uint32_t rlen; memcpy(&rlen, buf, 4);
memcpy(restored, buf + 4, rlen); restored[rlen] = '\0';
bash
./examples/build/release/full/onepath_bytes_demo
text
[2026-06-21-17-18-17:211] [INFO] === demo 1: basic type serialization ===
  serialize int32: 42 -> 4 bytes
[2026-06-21-17-18-17:211] [INFO]   deserialize:    42 (match)
[2026-06-21-17-18-17:211] [INFO] === demo 2: structured data serialization (sensor data) ===
    [1] Thermo-A: temp=23.5 C, hum=65.2%
[2026-06-21-17-18-17:211] [ OK ] all demos done.

变体:双后端。纯本地逻辑,不依赖网络栈,tiny 输出与 full 一致。

拉取式订阅

基于环形缓冲(Ring Buffer)的非阻塞拉取式订阅:消费者按自身节奏 try_recv 拉取最新样本,旧数据被环形缓冲自动覆盖。适合数据突发、过时数据比无数据更危险的场景。

关键 OnePath API

  • onepath_subscribe_pull(session, &sub, &rx, keyexpr, ring_size) — 以环形缓冲通道声明拉取订阅者,返回接收句柄 rx
  • onepath_sample_try_recv(rx, &sample) — 非阻塞拉取一条样本(ONEPATH_OK 表示命中)
  • onepath_sample_release / onepath_sample_rx_destroy / onepath_subscriber_destroy — 资源释放
c
onepath_subscriber_t sub;
onepath_sample_rx_t rx;
onepath_subscribe_pull(session, &sub, &rx, keyexpr, (size_t)ring_size);

while (1) {
    onepath_sleep_s(interval);
    onepath_sample_t sample;
    if (onepath_sample_try_recv(rx, &sample) == ONEPATH_OK) {
        EP_OK("Pull #%d %s '%s'", pull_count, kind_to_str(sample.kind), sample.key);
        onepath_sample_release(&sample);
        while (onepath_sample_try_recv(rx, &sample) == ONEPATH_OK)  /* 排空残留 */
            onepath_sample_release(&sample);
    }
}
bash
./examples/build/release/full/onepath_pull_sub --size 3 --interval 2
./examples/build/release/full/onepath_discovery pub         # 另一终端配合发布
text
[2026-06-21-17-19-57:110] [INFO] ring buffer size: 3, pull interval: 1 s
[2026-06-21-17-19-58:110] [INFO] Pull #1 no new data
[2026-06-21-17-20-04:111] [ OK ] Pull #7 PUT 'demo/discovery/hello': '[0] Hello from onepath_discovery pub!'
[2026-06-21-17-20-05:111] [ OK ] Pull #8 PUT 'demo/discovery/hello': '[1] Hello from onepath_discovery pub!'

变体:双后端

一次性读写 Put/Delete

无需声明 Publisher 的一次性写入/删除操作,适合命令行工具与低频写入场景。

关键 OnePath API

  • onepath_put_str(session, keyexpr, value) — 一次性写入字符串值(无需 publisher)
  • onepath_delete(session, keyexpr) — 一次性删除某 key
  • onepath_config_* / onepath_open_with_config / onepath_close — 会话生命周期
c
onepath_config_new(&cfg);
onepath_config_set_mode(cfg, mode);
if (endpoint) onepath_config_add_endpoint(cfg, endpoint);
onepath_open_with_config(&session, cfg);
onepath_config_destroy(cfg);

if (strcmp(action, "put") == 0)
    onepath_put_str(session, keyexpr, value);   /* one-shot 写入 */
else
    onepath_delete(session, keyexpr);           /* one-shot 删除 */
onepath_close(session);
bash
./examples/build/release/full/onepath_put_delete put    -k demo/example/hello -v "world"
./examples/build/release/full/onepath_put_delete delete -k demo/example/hello
text
[2026-06-21-17-18-28:154] [INFO] opening OnePath session ...
[2026-06-21-17-18-28:157] [ OK ] session opened.
[2026-06-21-17-18-28:157] [INFO] PUT key: 'demo/example/hello', value: 'world'
[2026-06-21-17-18-28:157] [ OK ] put succeeded.

变体:双后端。常配合 分布式存储Get 查询 使用:用 put 写入,再用 onepath_get_demo 查询验证。

OnePath™ 以预构建库形式交付,运行时零外部依赖。