便捷 API 与协同
本章演示 OnePath 把常见模式收敛成「一次调用」的便捷 API(三模存储 / 三模计算),以及若干协同型应用:纯本地 JSON 工具链、分布式状态同步、自组网 Mesh 与 会话信息查询。
「三模存储 / 三模计算」是把订阅、应答、发布三类基础动作收敛到单个对象的便捷封装,与 多模冗余 XMR(N 模冗余投票)是不同概念。
三模存储节点
仅 full。
用单个 store 对象把「订阅摄入 + 应答查询」收敛为一次调用(默认内置内存表),对照 分布式存储 的约 500 行手写哈希表。
关键 OnePath API
onepath_store_open(s, &st, keyexpr, NULL)— 一次调用同时接上「订阅摄入 + 应答查询」onepath_store_count(st)/onepath_store_close(st)— 条目计数 / 关闭onepath_put_str/onepath_delete/onepath_get— put / del / get 角色
onepath_store_t st;
if (onepath_store_open(s, &st, keyexpr, NULL) != ONEPATH_OK) { /* err */ }
EP_OK("store ready on '%s' (ingest + serve)", keyexpr);
for (;;) {
onepath_sleep_s(2);
EP_INFO("store entries: %zu", onepath_store_count(st));
}
onepath_store_close(st);./examples/build/release/full/onepath_store_node store -k 'demo/store/**' &
./examples/build/release/full/onepath_store_node put demo/store/a hello
./examples/build/release/full/onepath_store_node get 'demo/store/**'
./examples/build/release/full/onepath_store_node del demo/store/a[ OK ] put 'demo/store/a' = 'hello'
[ OK ] put 'demo/store/b' = 'world'
=== get demo/store/** ===
[ OK ] reply: 'demo/store/b' = 'world'
[ OK ] reply: 'demo/store/a' = 'hello'
[INFO] query 'demo/store/**' returned 2 result(s)
[ OK ] delete 'demo/store/a'变体:仅 full。便捷存储 API 是完整版能力,精简版不构建该示例。
三模计算节点
仅 full。
注册单个计算函数(本例转大写),一次调用即同时接上流式变换(订阅输入 → 计算 → 发布结果)与按需 RPC(应答查询)两路。
关键 OnePath API
onepath_compute_open(s, &c, &opts)— 一次调用同时接上「流式变换 + RPC 应答」ONEPATH_COMPUTE_OPTS_DEFAULT+opts.in_key/out_key/serve_key/fn— 配置输入/输出/RPC 键与计算函数onepath_compute_emit(sink, NULL, buf, n)— 计算函数内产出结果(key=NULL:流式用 out_key,RPC 用查询键)onepath_compute_close(c)— 关闭
static void uppercase_fn(void *ud, const char *key, const void *in, size_t n,
onepath_compute_sink_t *sink) {
char buf[256];
for (size_t i = 0; i < n; i++) buf[i] = toupper(((const char*)in)[i]);
onepath_compute_emit(sink, NULL, buf, n); /* key=NULL: 流式用 out_key, RPC 用查询键 */
}
onepath_compute_opts_t opts = ONEPATH_COMPUTE_OPTS_DEFAULT;
opts.in_key = in_key; opts.out_key = out_key; opts.serve_key = serve_key;
opts.fn = uppercase_fn;
onepath_compute_t c;
onepath_compute_open(s, &c, &opts);./examples/build/release/full/onepath_compute_node compute &
./examples/build/release/full/onepath_compute_node watch demo/compute/out &
./examples/build/release/full/onepath_compute_node feed demo/compute/in hello
./examples/build/release/full/onepath_compute_node call demo/compute/rpc world[ OK ] watching 'demo/compute/out'. Ctrl+C to exit.
[ OK ] transformed: 'demo/compute/out' = 'HELLO'
[ OK ] rpc('world') -> 'WORLD'
[INFO] rpc returned 1 result(s)
[ OK ] compute ready: stream demo/compute/in -> demo/compute/out, rpc on demo/compute/rpc变体:仅 full。
JSON 数据处理
独立于会话的纯本地 JSON/JSON5 工具链:解析(含注释、尾逗号)→ 路径查询 → 修改/新增 → 按下标删除 → 美化序列化。无需网络连接。
关键 OnePath API
onepath_json_parse(input, 0)— 解析 JSON/JSON5 文本得到文档句柄onepath_json_get(doc, "path", &v)— 按字符串路径查询值(支持tags[0]、nested/key)onepath_json_set(doc, path, value)/onepath_json_delete(doc, "tags[1]")— 修改/新增 / 按路径删除onepath_json_dump_pretty(doc, &buf, &len, 2)— 美化序列化到堆缓冲区(调用方释放)onepath_json_free(doc)— 释放文档句柄
onepath_json_t *doc = onepath_json_parse(input, 0);
onepath_json_val_t v;
onepath_json_get(doc, "port", &v);
onepath_json_set(doc, "port", "9090");
onepath_json_set(doc, "new_field", "\"hello\"");
onepath_json_delete(doc, "tags[1]");
char *buf = NULL; size_t len = 0;
onepath_json_dump_pretty(doc, &buf, &len, 2);
onepath_json_free(doc);./examples/build/release/full/onepath_json_demo[ OK ] parsed JSON5 document
[INFO] server = example.com (type=5)
[INFO] port = 8080 (type=3)
[INFO] tags[0]= prod
[ OK ] modified port and added new_field
[ OK ] deleted tags[1]
[ OK ] done变体:双后端。JSON 工具链与后端无关,输出一致。
状态同步
多节点各维护本地 KV 存储,用时间戳(HLC)做 Last-Write-Wins 合并、存活订阅感知对端上下线、新节点上线时全量查询恢复,最终各节点内容趋同。
关键 OnePath API
onepath_liveliness_declare/onepath_liveliness_subscribe— 声明本节点存活 + 订阅对端上下线onepath_subscribe— 订阅实时数据同步onepath_declare_responder+onepath_request_reply_str/onepath_request_release— 应答其他节点的状态全量查询onepath_get+ reply 接收族 — 断线恢复时全量拉取onepath_put_with_opts(session, key, value, len, &wopts)— 带 attachment(时间戳 + 来源名)发布
onepath_liveliness_declare(session, &token, live_key);
onepath_liveliness_subscribe(session, &live_sub, "state/peer/**", on_peer_liveliness, NULL, 1);
onepath_subscribe(session, &data_sub, "state/data/**", on_state_update, NULL);
onepath_declare_responder(session, &resp, "state/data/**", on_state_query, NULL, 1);
/* 主循环:周期写入并带 attachment 发布 */
onepath_write_opts_t wopts = { NULL, att_buf, att_size };
onepath_put_with_opts(session, pub_key, value, strlen(value), &wopts);./examples/build/release/full/onepath_state_sync --name A &
./examples/build/release/full/onepath_state_sync --name B &
./examples/build/release/full/onepath_state_sync --name C[INFO] subscribed liveliness: 'state/peer/**'
[INFO] responder ready: 'state/data/**'
[INFO] sync update: B/sensor0 = reading_0_from_B (from B, ts: 1782033633955036714)
[INFO] sync update: A/sensor0 = reading_0_from_A (from A, ts: 1782033633955036518)
[INFO] local KV store dump [C]:
[INFO] total 3 records变体:双后端。
自组网 Mesh
节点启动先组播扫描邻近节点再开会话,用存活令牌注册自身服务(gateway / sensor / compute),订阅 mesh/service/** 维护一张「谁在线、什么角色」的拓扑表。
关键 OnePath API
onepath_scout(on_hello, NULL, 3000)— 组播扫描发现邻近节点onepath_liveliness_declare/onepath_liveliness_undeclare— 注册/撤销本节点服务onepath_liveliness_subscribe(session, &sub, "mesh/service/**", on_liveliness, NULL, 1)— 订阅节点上下线onepath_info_zid/onepath_info_peers— 取本节点 ID、枚举已连接对等节点
onepath_scout(on_hello, NULL, 3000); /* 阶段1: 扫描邻近节点 */
onepath_open_with_config(&session, cfg);
snprintf(live_key, sizeof(live_key), "mesh/service/%s/%s", name, service);
onepath_liveliness_declare(session, &token, live_key); /* 服务注册 */
onepath_liveliness_subscribe(session, &live_sub, "mesh/service/**",
on_liveliness, NULL, 1); /* 发现其他节点 */./examples/build/release/full/onepath_mesh_node --name gw --service gateway &
./examples/build/release/full/onepath_mesh_node --name s1 --service sensor &
./examples/build/release/full/onepath_mesh_node --name c1 --service compute[INFO] session established.
[INFO] liveliness token declared: 'mesh/service/c1/compute'
[INFO] node online: 'mesh/service/gw/gateway' (name=gw, service=gateway)
[INFO] node online: 'mesh/service/s1/sensor' (name=s1, service=sensor)
[INFO] subscribed liveliness: 'mesh/service/**'
[INFO] mesh node running (Ctrl+C to exit) ...变体:双后端。可配合 存活快照查询 一次性拉取在线服务列表。
会话信息
建立会话后查询并打印本节点 ID,并以回调方式枚举当前已连接的路由节点与对等节点列表。
关键 OnePath API
onepath_info_zid(session, &node_id)— 取本节点 IDonepath_info_routers(session, on_router_zid, NULL)/onepath_info_peers(session, on_peer_zid, NULL)— 回调枚举已连接路由节点 / 对等节点
onepath_open_with_config(&session, cfg);
onepath_node_id_t node_id;
onepath_info_zid(session, &node_id);
EP_INFO("local ZID: %s", node_id.zid);
onepath_info_routers(session, on_router_zid, NULL);
onepath_info_peers(session, on_peer_zid, NULL);./examples/build/release/full/onepath_info
./examples/build/release/full/onepath_info --mode client -e tcp/localhost:7447[INFO] run mode: peer
[INFO] === OnePath session info ===
[INFO] local ZID: 0570427cfa06624dcc63ac8fe00318e8
[INFO] connected routers:
[INFO] connected peers:
[ OK ] info query done变体:双后端。无邻居时列表为空。