可靠性
本章聚焦让分布式系统在故障下仍可信赖的能力:断线自动重连、存活快照查询、连接监控、数据完整性校验 与 三模冗余 TMR 纠错。需要更强的 N 模冗余投票时,见 多模冗余 XMR。
断线重连
会话断线后自动重连:发布端用存活令牌(Liveliness token)声明在线并周期发布,订阅端通过存活订阅感知发布端的离线/恢复并继续接收数据。
关键 OnePath API
onepath_liveliness_declare/onepath_liveliness_undeclare— 声明/撤销存活令牌(pub 端)onepath_liveliness_subscribe— 订阅存活事件(sub 端),回调按sample->kind(ONEPATH_SAMPLE_PUT=上线,DELETE=离线)区分onepath_open_with_config— client 模式底层自动重连onepath_declare_publisher/onepath_publisher_put_str— 周期发布;断线时返回错误码
/* pub */
onepath_liveliness_declare(session, &token, "demo/reconnect/alive");
onepath_declare_publisher(session, &pub, keyexpr, NULL);
for (int idx = 0; ; idx++) {
snprintf(buf, sizeof(buf), "[%d] Hello from onepath_reconnect_pub", idx);
onepath_publisher_put_str(pub, buf); /* 断线时返回错误码,恢复后继续 */
onepath_sleep_s(1);
}
/* sub */
onepath_liveliness_subscribe(session, &live_sub, key, on_liveliness, NULL, 1);# 需预先启动监听 tcp/localhost:7447 的路由器
./examples/build/release/full/onepath_reconnect_sub # 终端 1:监控存活
./examples/build/release/full/onepath_reconnect_pub # 终端 2:声明存活并发布
# 人为停掉/重启路由器,观察 sub 端 token 离线/重新上线事件[ OK ] session established
[ OK ] liveliness subscriber declared (key=demo/reconnect/alive)
[ OK ] data subscriber declared (key=demo/reconnect/**)
[ OK ] [liveliness] PUT token online (publisher alive)
[INFO] received PUT: key='demo/reconnect/data', value='[0] Hello from onepath_reconnect_pub'变体:双后端。需要路由器。停掉路由器/pub 时出现
[liveliness] DELETE token offline,恢复后重新上线。
存活快照查询
通过一次性查询获取当前所有存活令牌,用于节点断连恢复后立即重建集群存活快照,而非被动等待事件推送。
关键 OnePath API
onepath_liveliness_get(session, keyexpr, &rx, timeout_ms)— 发起一次同步存活查询,返回阻塞式回复接收器onepath_reply_recv/onepath_reply_release/onepath_reply_rx_destroy— 逐条接收并释放回复
onepath_reply_rx_t rx;
onepath_liveliness_get(session, keyexpr, &rx, timeout_ms);
int count = 0;
onepath_reply_t reply;
while (onepath_reply_recv(rx, &reply) == ONEPATH_OK) {
if (reply.is_ok) { EP_OK(" alive: %s", reply.key); count++; }
onepath_reply_release(&reply);
}
onepath_reply_rx_destroy(rx);./examples/build/release/full/onepath_mesh_node --name s1 --service sensor &
./examples/build/release/full/onepath_mesh_node --name gw --service gateway &
./examples/build/release/full/onepath_liveliness_query -k "mesh/service/**"
./examples/build/release/full/onepath_liveliness_query --watch 5 # 每 5 秒循环查询[ OK ] session opened
[INFO] query liveliness tokens: 'mesh/service/**' (timeout: 2000 ms)
[ OK ] alive: mesh/service/s1/sensor
[ OK ] alive: mesh/service/gw/gateway
[ OK ] found 2 alive nodes in total变体:双后端。无存活节点时正常返回 0。常配合 自组网 Mesh 使用。
连接监控
周期性轮询本节点直连的路由器/对等节点列表并与上一轮做差异比对,把链路的上线/断开变化暴露成时间线,同时叠加存活订阅监控存活节点。
关键 OnePath API
onepath_info_zid— 获取本节点 IDonepath_info_routers/onepath_info_peers— 回调枚举当前路由器/对等节点onepath_liveliness_subscribe— 叠加存活事件监控(声明失败仅告警,不致命)
onepath_info_zid(session, &own_id);
onepath_info_routers(session, collect_zid, &prev_routers);
onepath_info_peers(session, collect_zid, &prev_peers);
onepath_liveliness_subscribe(session, &live_sub, live_keyexpr, on_liveliness, NULL, 1);
while (g_running) {
onepath_sleep_s(interval);
onepath_info_peers(session, collect_zid, &curr_peers);
detect_changes("Peer", &prev_peers, &curr_peers); /* 标 added / removed */
}./examples/build/release/full/onepath_link_monitor
./examples/build/release/full/onepath_discovery pub # 另一终端触发连接事件[INFO] local ZID: 91946c4fd0ff07f11a08e93f539dfec4
[INFO] === initial connection state ===
[INFO] Peer[0]: 95d86b68009ed5ba6582ec31c7c18500
[INFO] [scan #1] connection change detected:
[WARN] [change] Peer removed: 95d86b68009ed5ba6582ec31c7c18500
[ OK ] [change] Peer added: 689166313f6c4364d7aad41830a349ba变体:双后端。
数据完整性 CRC32
发布端对每条 payload 计算 CRC32 并通过 attachment(附件)携带,订阅端重算 CRC 比对以检测传输链路上的位翻转。--corrupt 可注入故障,观察校验失败与累计损坏率。
关键 OnePath API
onepath_publisher_write(pub, data, len, &wopts)+onepath_write_opts_t{encoding, attachment, attachment_len}— 携带 attachment 发布 payload- 订阅回调读取
sample->attachment/attachment_len/data/data_len,重算 CRC 比对
/* pub: attachment = {crc32(u32), payload_len(u64)} */
uint32_t crc = crc32_compute((const uint8_t*)buf, msg_len);
uint8_t att_buf[12];
memcpy(att_buf, &crc, 4); memcpy(att_buf + 4, &msg_len64, 8);
onepath_write_opts_t wopts = { NULL, att_buf, sizeof(att_buf) };
onepath_publisher_write(pub, (const uint8_t*)buf, msg_len, &wopts);
/* sub callback */
memcpy(&expected_crc, sample->attachment, 4);
uint32_t actual_crc = crc32_compute(sample->data, sample->data_len);
if (actual_crc == expected_crc) EP_OK("CRC match ...");
onepath_sample_release(sample);./examples/build/release/full/onepath_integrity_sub &
./examples/build/release/full/onepath_integrity_pub # peer 模式无需路由器
./examples/build/release/full/onepath_integrity_pub --corrupt # 注入位翻转[INFO] data integrity subscription started:
[INFO] Key: demo/integrity/data
[INFO] Check: CRC32
[ OK ] #1: CRC match: 0x94290BA3 | corruption rate: 0.0%
[ OK ] #2: CRC match: 0x611647F9 | corruption rate: 0.0%
[ OK ] #3: CRC match: 0xAA3F7753 | corruption rate: 0.0%变体:双后端。OnePath 的 TLV attachment 格式承载校验码,属于 OnePath 自有的公共能力。
三模冗余 TMR
单一二进制以四种角色(voter + node1/2/3)演示三模冗余 2-of-3 多数投票纠错:三个计算节点订阅同一输入各自计算并发布结果,投票器订阅三方结果做多数裁决,纠正注入的计算故障。
关键 OnePath API
onepath_subscribe(session, &result_sub, "tmr/result/**", on_result, NULL)— voter 通配订阅结果,回调内做 2-of-3 投票onepath_declare_publisher/onepath_publisher_put— node 声明结果发布者并发布二进制帧onepath_subscribe_pull+onepath_sample_recv— node 以拉取/环形缓冲模式阻塞拉取输入
/* voter:收齐 >=2 个即 2-of-3 多数投票 */
onepath_subscribe(session, &result_sub, "tmr/result/**", on_result, NULL);
/* node */
onepath_declare_publisher(session, &result_pub, "tmr/result/N", NULL);
onepath_subscribe_pull(session, &input_sub, &input_rx, "tmr/input", 16);
onepath_sample_recv(input_rx, &sample); /* 拉取输入 */
int32_t result = compute(input_value); /* input*7+42 */
onepath_publisher_put(result_pub, result_buf, sizeof(result_buf));./examples/build/release/full/onepath_tmr_node voter &
./examples/build/release/full/onepath_tmr_node node1 &
./examples/build/release/full/onepath_tmr_node node2 &
./examples/build/release/full/onepath_tmr_node node3 # 可加 --corrupt 注入故障[INFO] subscribed to 'tmr/result/**'
[ OK ] [voter] seq 0: consensus reached OK result: 77
[stats] total votes: 1, consensus: 1, divergent: 0, consensus rate: 100.0%
[ OK ] [voter] seq 1: consensus reached OK result: 168
[stats] total votes: 2, consensus: 2, divergent: 0, consensus rate: 100.0%变体:双后端。无需路由器(peer 组播)。TMR 是固定 3 路投票;需要任意 N 路、自动感知副本数与读端边缘投票时,见 多模冗余 XMR。
高级 Pub/Sub(缓存与历史回放)
仅 full。tiny 不构建此示例。
演示发布缓存、丢包检测与恢复、后加入订阅者的历史回放——发布端先发布若干消息并缓存,后启动的订阅端仍能收到历史。
./examples/build/release/full/onepath_advanced_pub # 先发布若干消息
./examples/build/release/full/onepath_advanced_sub # 后启动,应收到历史变体:仅 full。高级 Pub/Sub(缓存 + 丢包恢复 + 历史回放)是完整版的能力边界,精简版调用对应 API 返回
ONEPATH_ERR_UNSUPPORTED,且示例不构建。