Skip to content

性能与零拷贝

本章演示 OnePath 的延迟与吞吐能力:通用的 Ping/Pong 往返延迟延迟自适应 QoS,以及一整组共享内存(SHM)零拷贝示例——同主机进程间传输无需内存拷贝。

SHM 系列为完整版(full)专属能力。精简版(tiny)不构建 SHM 相关示例;调用对应 API 返回 ONEPATH_ERR_UNSUPPORTED。SHM 的概念与配置详见 SHM 场景。Ping/Pong 与延迟自适应为双后端。

Ping/Pong 往返延迟

测量 OnePath 普通发布/订阅通信的往返延迟(RTT):ping 端发布探针、pong 端原样回声,统计 min/avg/median/P99/max。

关键 OnePath API

  • onepath_declare_publisher / onepath_subscribe — 声明 ping 探针发布器与 pong 回声订阅
  • onepath_publisher_put — 发布定长 payload(探针与回声)
  • onepath_sample_release — 释放回调样本
c
onepath_declare_publisher(session, &pub, "test/ping", NULL);
onepath_subscribe(session, &sub, "test/pong", ping_pong_cb, NULL);
for (int i = 0; i < rounds; i++) {
    clock_gettime(CLOCK_MONOTONIC, &start);
    onepath_publisher_put(pub, data, payload_size);
    /* 等待 pong 回声唤醒 */
    clock_gettime(CLOCK_MONOTONIC, &end);
    latencies[i] = rtt_us(start, end);
}
bash
./examples/build/release/full/onepath_ping_pong pong &
./examples/build/release/full/onepath_ping_pong ping
./examples/build/release/full/onepath_ping_pong ping -n 1000 -s 64
text
[INFO] PING payload: 64 bytes, rounds: 50 (warmup: 5)
[INFO] === Ping/Pong latency stats (50 rounds, 64 bytes payload) ===
  min:            41.7 us
  avg:            44.8 us
  median:         43.3 us
  p99:            68.3 us
  max:            68.3 us

变体:双后端。tiny 后端 RTT 更低(约 avg 21.9 us)。

延迟自适应

sender 先用 ping 探针测 RTT,按链路质量(excellent / medium / poor)自适应选择发布 QoS(优先级、拥塞控制、express、发布间隔)后持续发布;monitor 回声 ping 并打印数据及其 QoS 标签。

关键 OnePath API

  • onepath_declare_publisher + onepath_pub_opts_tcongestion_control / priority / express / reliability)— 按 QoS 档位声明发布器
  • 常量 ONEPATH_PRIORITY_REALTIME/DATA/BACKGROUNDONEPATH_CC_DROP/BLOCKONEPATH_RELIABILITY_RELIABLE
  • 订阅样本字段 sample->priority / congestion_control / express
c
onepath_pub_opts_t pub_opts = {
    .congestion_control = profile->congestion_control,
    .priority           = profile->priority,
    .express            = profile->express,
    .reliability        = ONEPATH_RELIABILITY_RELIABLE,
};
onepath_declare_publisher(session, &data_pub, keyexpr, &pub_opts);
snprintf(buf, sizeof(buf), "[%d] link: %s, RTT: %.1fms", msg_idx++, profile->label, avg_rtt);
onepath_publisher_put_str(data_pub, buf);
onepath_sleep_ms(profile->publish_interval_ms);
bash
./examples/build/release/full/onepath_latency_adapt monitor &
./examples/build/release/full/onepath_latency_adapt sender
./examples/build/release/full/onepath_latency_adapt sender --probes 20 --interval 60
text
[INFO] === RTT statistics (10 probes) ===
  min:           0.037 ms
  link:     excellent (threshold: 10.0 ms)
  QoS:      priority=1, congestion=DROP, express=yes, interval=100 ms
[INFO] pub: [0] link: excellent, RTT: 0.1ms
[INFO] data: key: 'demo/adapt/data' | value: '[0] link: excellent, RTT: 0.1ms' | priority=1 congestion=DROP express=yes

变体:双后端

共享内存 Pub/Sub

仅 full

同主机进程间共享内存零拷贝传输:发布端在 SHM 缓冲池分配内存写入 payload 并发布句柄,订阅端无需拷贝即可直接读取。订阅端回调照常收 onepath_sample_t——SHM 检测对用户透明。

关键 OnePath API

  • onepath_shm_pool_create / onepath_shm_pool_destroy — 创建/销毁 SHM 缓冲池
  • onepath_publisher_put_shm(pub, pool, data, len) — 经 SHM 池零拷贝发布(分配失败自动回退普通拷贝)
  • onepath_subscribe — 订阅端照常收样本,无需感知 SHM
c
/* pub */
onepath_shm_pool_create(session, &pool, SHM_POOL_SIZE);
onepath_declare_publisher(session, &pub, keyexpr, NULL);
onepath_publisher_put_shm(pub, pool, msg, n);    /* 零拷贝 */
/* sub:on_sample 内直接读 sample->data / sample->data_len */
onepath_subscribe(session, &sub, "demo/shm/**", on_sample, NULL);
bash
./examples/build/release/full/onepath_shm_sub &
./examples/build/release/full/onepath_shm_pub
text
[INFO] SHM pool created (pool size: 1048576 bytes)
[INFO] [SHM PUB] [0] Hello from onepath_shm_pub (zero-copy!)
...
[ OK ] session opened
[INFO] [RECV] key: 'demo/shm/hello', 43 bytes, value: '[0] Hello from onepath_shm_pub (zero-copy!)'

变体:仅 full

SHM 请求/应答

仅 full

请求/应答模式:responder 声明应答端、回调中回复预设 value;get 端发起一次查询并 FIFO 接收所有回复。SHM 优化对用户透明。

关键 OnePath API

  • onepath_declare_responder / onepath_request_reply / onepath_request_release — 应答端
  • onepath_get + onepath_get_opts_tpayload / payload_len)— 发起查询
  • onepath_reply_recv / onepath_reply_release / onepath_reply_rx_destroy — 接收回复
c
/* responder */
onepath_declare_responder(session, &resp, keyexpr, on_query, &ctx, 1);
/* on_query: */ onepath_request_reply(query, ctx->keyexpr, ctx->value, value_len);
                onepath_request_release(query);
/* get */
onepath_get_opts_t opts = ONEPATH_GET_OPTS_DEFAULT;
opts.payload = value; opts.payload_len = strlen(value);
onepath_get(session, keyexpr, "", &rx, &opts);
while (onepath_reply_recv(rx, &reply) == ONEPATH_OK) { /* reply.key / reply.data */
    onepath_reply_release(&reply);
}
bash
./examples/build/release/full/onepath_shm_request responder &
./examples/build/release/full/onepath_shm_request get
text
[INFO] Responder ready on 'demo/shm/query' (Ctrl+C to exit) ...
[INFO] [QUERY] 'demo/shm/query?' payload='SHM responder response'
[ OK ]   >> reply: 'demo/shm/query' = 'SHM responder response'
[ OK ] 1 replies received

变体:仅 full

SHM 延迟测试

仅 full

测量同主机进程间经共享内存通信的往返延迟:pong 常驻回声、ping 用 SHM 池发布并以条件变量保证「一次在途一个 ping」,输出 min/avg/median/P99/max。

关键 OnePath API

  • onepath_shm_pool_create / onepath_publisher_put_shm — SHM 池 + 零拷贝发布 ping
  • onepath_publisher_put — pong 回声
  • onepath_subscribe / onepath_sample_release
c
onepath_shm_pool_create(session, &pool, shm_pool_size);
for (int i = 0; i < rounds; i++) {
    clock_gettime(CLOCK_MONOTONIC, &ts_start);
    onepath_publisher_put_shm(pub, pool, data, payload_size);
    /* 条件变量等 pong 回声 */
    clock_gettime(CLOCK_MONOTONIC, &ts_end);
    latencies[i] = us(ts_start, ts_end);
}
bash
./examples/build/release/full/onepath_shm_ping pong &
./examples/build/release/full/onepath_shm_ping ping 64
./examples/build/release/full/onepath_shm_ping ping 1024 -n 200
text
[INFO] [SHM PING] payload: 64 bytes, rounds: 100 (warmup: 5)
[INFO] === SHM Ping/Pong latency stats (100 rounds, 64 bytes payload) ===
  min:            40.6 us
  avg:            46.0 us
  median:         42.0 us
  P99:            97.9 us
  max:            97.9 us

变体:仅 full

吞吐量基准

仅 full

吞吐上限基准:sub 后台收消息并逐轮统计速率,pub/pub-shm 端在紧凑循环里高速发布定长 payload,对比「普通内存拷贝」与「SHM 零拷贝」两种发布路径。

关键 OnePath API

  • onepath_config_enable_shm — 在 SHM 发布会话上启用共享内存
  • onepath_declare_publisher + onepath_pub_opts_t{ .congestion_control = ONEPATH_CC_BLOCK } — 背压阻塞而非丢包
  • onepath_publisher_put / onepath_publisher_put_shm — 两种发布路径
  • onepath_subscribe_background — 后台订阅(无显式句柄),回调统计速率
c
/* sub */
onepath_subscribe_background(session, KEYEXPR, on_sample, stats);
/* pub-shm */
onepath_config_enable_shm(cfg, NULL);
onepath_pub_opts_t opts = { .congestion_control = ONEPATH_CC_BLOCK };
onepath_declare_publisher(session, &pub, KEYEXPR, &opts);
onepath_shm_pool_create(session, &pool, shm_pool_size);
while (1) onepath_publisher_put_shm(pub, pool, value, size);
bash
./examples/build/release/full/onepath_throughput sub --messages 100000 --rounds 5
./examples/build/release/full/onepath_throughput pub 1024
./examples/build/release/full/onepath_throughput pub-shm 1024
text
[INFO] [SUB] throughput test: 5000 msgs/round, 3 rounds
[round 1] 1279024 msg/s (3.9 ms / 5000 msgs)
[INFO] === throughput summary ===
  total messages: 15000
  average rate:   1154980 msg/s

变体:仅 full

SHM 传输基准

仅 full

单进程双会话 ping/pong,对比共享内存传输与 socket 传输在空载(none)与 iperf3 打满 loopback 两种负载下的 RTT,检验「SHM 绕过网络栈、不受 loopback 拥塞影响」。iperf3 由程序自动拉起,未装则对应行降级为 N/A。

关键 OnePath API

  • onepath_config_enable_shm — 选 SHM 传输
  • onepath_config_set_json5(cfg, path, value) — 高级配置:设监听端点、或 "transport/shared_memory/enabled"="false" 强制走 socket
  • onepath_subscribe_background / onepath_publisher_put — ping/pong 回声与计时
c
static void apply_transport(onepath_config_t cfg, int shm) {
    onepath_config_set_mode(cfg, "peer");
    if (shm) onepath_config_enable_shm(cfg, NULL);
    else     onepath_config_set_json5(cfg, "transport/shared_memory/enabled", "false");
}
onepath_config_set_json5(cfg_sub, "listen/endpoints", "[\"tcp/127.0.0.1:7800\"]");
onepath_subscribe_background(sub_s, KEY_PING, on_ping, pong_pub);
onepath_publisher_put(ping_pub, buf, size);    /* 计时 + condvar 等 pong */
bash
./examples/build/release/full/onepath_shm_bench
./examples/build/release/full/onepath_shm_bench --sizes 64,256 --rounds 2000
text
[INFO] ping/pong latency: SHM vs socket, none vs iperf3, 2 size(s)
 MsgSize  Transport  Stress    min(us)   avg(us)   p50(us)   p99(us)  max(us)
 64 B     SHM        none         37.1      40.3      38.9      95.2  141.7
 64 B     socket     none         30.5      42.5      40.8     125.2  179.0

变体:仅 full。SHM 在 loopback 被压满时 P99 显著优于 socket(不经网络栈)。详见 SHM 场景

OnePath™ 以预构建库形式交付,运行时零外部依赖。