Skip to content

可靠性

本章聚焦让分布式系统在故障下仍可信赖的能力:断线自动重连存活快照查询连接监控数据完整性校验三模冗余 TMR 纠错。需要更强的 N 模冗余投票时,见 多模冗余 XMR

断线重连

会话断线后自动重连:发布端用存活令牌(Liveliness token)声明在线并周期发布,订阅端通过存活订阅感知发布端的离线/恢复并继续接收数据。

关键 OnePath API

  • onepath_liveliness_declare / onepath_liveliness_undeclare — 声明/撤销存活令牌(pub 端)
  • onepath_liveliness_subscribe — 订阅存活事件(sub 端),回调按 sample->kindONEPATH_SAMPLE_PUT=上线,DELETE=离线)区分
  • onepath_open_with_config — client 模式底层自动重连
  • onepath_declare_publisher / onepath_publisher_put_str — 周期发布;断线时返回错误码
c
/* pub */
onepath_liveliness_declare(session, &token, "demo/reconnect/alive");
onepath_declare_publisher(session, &pub, keyexpr, NULL);
for (int idx = 0; ; idx++) {
    snprintf(buf, sizeof(buf), "[%d] Hello from onepath_reconnect_pub", idx);
    onepath_publisher_put_str(pub, buf);   /* 断线时返回错误码,恢复后继续 */
    onepath_sleep_s(1);
}
/* sub */
onepath_liveliness_subscribe(session, &live_sub, key, on_liveliness, NULL, 1);
bash
# 需预先启动监听 tcp/localhost:7447 的路由器
./examples/build/release/full/onepath_reconnect_sub     # 终端 1:监控存活
./examples/build/release/full/onepath_reconnect_pub     # 终端 2:声明存活并发布
# 人为停掉/重启路由器,观察 sub 端 token 离线/重新上线事件
text
[ OK ] session established
[ OK ] liveliness subscriber declared (key=demo/reconnect/alive)
[ OK ] data subscriber declared (key=demo/reconnect/**)
[ OK ] [liveliness] PUT token online (publisher alive)
[INFO] received PUT: key='demo/reconnect/data', value='[0] Hello from onepath_reconnect_pub'

变体:双后端。需要路由器。停掉路由器/pub 时出现 [liveliness] DELETE token offline,恢复后重新上线。

存活快照查询

通过一次性查询获取当前所有存活令牌,用于节点断连恢复后立即重建集群存活快照,而非被动等待事件推送。

关键 OnePath API

  • onepath_liveliness_get(session, keyexpr, &rx, timeout_ms) — 发起一次同步存活查询,返回阻塞式回复接收器
  • onepath_reply_recv / onepath_reply_release / onepath_reply_rx_destroy — 逐条接收并释放回复
c
onepath_reply_rx_t rx;
onepath_liveliness_get(session, keyexpr, &rx, timeout_ms);
int count = 0;
onepath_reply_t reply;
while (onepath_reply_recv(rx, &reply) == ONEPATH_OK) {
    if (reply.is_ok) { EP_OK("  alive: %s", reply.key); count++; }
    onepath_reply_release(&reply);
}
onepath_reply_rx_destroy(rx);
bash
./examples/build/release/full/onepath_mesh_node --name s1 --service sensor &
./examples/build/release/full/onepath_mesh_node --name gw --service gateway &
./examples/build/release/full/onepath_liveliness_query -k "mesh/service/**"
./examples/build/release/full/onepath_liveliness_query --watch 5    # 每 5 秒循环查询
text
[ OK ] session opened
[INFO] query liveliness tokens: 'mesh/service/**' (timeout: 2000 ms)
[ OK ]   alive: mesh/service/s1/sensor
[ OK ]   alive: mesh/service/gw/gateway
[ OK ] found 2 alive nodes in total

变体:双后端。无存活节点时正常返回 0。常配合 自组网 Mesh 使用。

连接监控

周期性轮询本节点直连的路由器/对等节点列表并与上一轮做差异比对,把链路的上线/断开变化暴露成时间线,同时叠加存活订阅监控存活节点。

关键 OnePath API

  • onepath_info_zid — 获取本节点 ID
  • onepath_info_routers / onepath_info_peers — 回调枚举当前路由器/对等节点
  • onepath_liveliness_subscribe — 叠加存活事件监控(声明失败仅告警,不致命)
c
onepath_info_zid(session, &own_id);
onepath_info_routers(session, collect_zid, &prev_routers);
onepath_info_peers(session, collect_zid, &prev_peers);
onepath_liveliness_subscribe(session, &live_sub, live_keyexpr, on_liveliness, NULL, 1);
while (g_running) {
    onepath_sleep_s(interval);
    onepath_info_peers(session, collect_zid, &curr_peers);
    detect_changes("Peer", &prev_peers, &curr_peers);   /* 标 added / removed */
}
bash
./examples/build/release/full/onepath_link_monitor
./examples/build/release/full/onepath_discovery pub      # 另一终端触发连接事件
text
[INFO] local ZID: 91946c4fd0ff07f11a08e93f539dfec4
[INFO] === initial connection state ===
[INFO]   Peer[0]: 95d86b68009ed5ba6582ec31c7c18500
[INFO] [scan #1] connection change detected:
[WARN] [change] Peer removed: 95d86b68009ed5ba6582ec31c7c18500
[ OK ] [change] Peer added: 689166313f6c4364d7aad41830a349ba

变体:双后端

数据完整性 CRC32

发布端对每条 payload 计算 CRC32 并通过 attachment(附件)携带,订阅端重算 CRC 比对以检测传输链路上的位翻转。--corrupt 可注入故障,观察校验失败与累计损坏率。

关键 OnePath API

  • onepath_publisher_write(pub, data, len, &wopts) + onepath_write_opts_t{encoding, attachment, attachment_len} — 携带 attachment 发布 payload
  • 订阅回调读取 sample->attachment / attachment_len / data / data_len,重算 CRC 比对
c
/* pub: attachment = {crc32(u32), payload_len(u64)} */
uint32_t crc = crc32_compute((const uint8_t*)buf, msg_len);
uint8_t att_buf[12];
memcpy(att_buf, &crc, 4); memcpy(att_buf + 4, &msg_len64, 8);
onepath_write_opts_t wopts = { NULL, att_buf, sizeof(att_buf) };
onepath_publisher_write(pub, (const uint8_t*)buf, msg_len, &wopts);

/* sub callback */
memcpy(&expected_crc, sample->attachment, 4);
uint32_t actual_crc = crc32_compute(sample->data, sample->data_len);
if (actual_crc == expected_crc) EP_OK("CRC match ...");
onepath_sample_release(sample);
bash
./examples/build/release/full/onepath_integrity_sub &
./examples/build/release/full/onepath_integrity_pub               # peer 模式无需路由器
./examples/build/release/full/onepath_integrity_pub --corrupt     # 注入位翻转
text
[INFO] data integrity subscription started:
[INFO]   Key:      demo/integrity/data
[INFO]   Check:    CRC32
[ OK ] #1: CRC match: 0x94290BA3 | corruption rate: 0.0%
[ OK ] #2: CRC match: 0x611647F9 | corruption rate: 0.0%
[ OK ] #3: CRC match: 0xAA3F7753 | corruption rate: 0.0%

变体:双后端。OnePath 的 TLV attachment 格式承载校验码,属于 OnePath 自有的公共能力。

三模冗余 TMR

单一二进制以四种角色(voter + node1/2/3)演示三模冗余 2-of-3 多数投票纠错:三个计算节点订阅同一输入各自计算并发布结果,投票器订阅三方结果做多数裁决,纠正注入的计算故障。

关键 OnePath API

  • onepath_subscribe(session, &result_sub, "tmr/result/**", on_result, NULL) — voter 通配订阅结果,回调内做 2-of-3 投票
  • onepath_declare_publisher / onepath_publisher_put — node 声明结果发布者并发布二进制帧
  • onepath_subscribe_pull + onepath_sample_recv — node 以拉取/环形缓冲模式阻塞拉取输入
c
/* voter:收齐 >=2 个即 2-of-3 多数投票 */
onepath_subscribe(session, &result_sub, "tmr/result/**", on_result, NULL);

/* node */
onepath_declare_publisher(session, &result_pub, "tmr/result/N", NULL);
onepath_subscribe_pull(session, &input_sub, &input_rx, "tmr/input", 16);
onepath_sample_recv(input_rx, &sample);           /* 拉取输入 */
int32_t result = compute(input_value);            /* input*7+42 */
onepath_publisher_put(result_pub, result_buf, sizeof(result_buf));
bash
./examples/build/release/full/onepath_tmr_node voter &
./examples/build/release/full/onepath_tmr_node node1 &
./examples/build/release/full/onepath_tmr_node node2 &
./examples/build/release/full/onepath_tmr_node node3            # 可加 --corrupt 注入故障
text
[INFO] subscribed to 'tmr/result/**'
[ OK ] [voter] seq 0: consensus reached OK result: 77
[stats] total votes: 1, consensus: 1, divergent: 0, consensus rate: 100.0%
[ OK ] [voter] seq 1: consensus reached OK result: 168
[stats] total votes: 2, consensus: 2, divergent: 0, consensus rate: 100.0%

变体:双后端。无需路由器(peer 组播)。TMR 是固定 3 路投票;需要任意 N 路、自动感知副本数与读端边缘投票时,见 多模冗余 XMR

高级 Pub/Sub(缓存与历史回放)

仅 full。tiny 不构建此示例。

演示发布缓存、丢包检测与恢复、后加入订阅者的历史回放——发布端先发布若干消息并缓存,后启动的订阅端仍能收到历史。

bash
./examples/build/release/full/onepath_advanced_pub       # 先发布若干消息
./examples/build/release/full/onepath_advanced_sub       # 后启动,应收到历史

变体:仅 full。高级 Pub/Sub(缓存 + 丢包恢复 + 历史回放)是完整版的能力边界,精简版调用对应 API 返回 ONEPATH_ERR_UNSUPPORTED,且示例不构建。

OnePath™ 以预构建库形式交付,运行时零外部依赖。