性能与零拷贝
本章演示 OnePath 的延迟与吞吐能力:通用的 Ping/Pong 往返延迟 与 延迟自适应 QoS,以及一整组共享内存(SHM)零拷贝示例——同主机进程间传输无需内存拷贝。
SHM 系列为完整版(full)专属能力。精简版(tiny)不构建 SHM 相关示例;调用对应 API 返回
ONEPATH_ERR_UNSUPPORTED。SHM 的概念与配置详见 SHM 场景。Ping/Pong 与延迟自适应为双后端。
Ping/Pong 往返延迟
测量 OnePath 普通发布/订阅通信的往返延迟(RTT):ping 端发布探针、pong 端原样回声,统计 min/avg/median/P99/max。
关键 OnePath API
onepath_declare_publisher/onepath_subscribe— 声明 ping 探针发布器与 pong 回声订阅onepath_publisher_put— 发布定长 payload(探针与回声)onepath_sample_release— 释放回调样本
onepath_declare_publisher(session, &pub, "test/ping", NULL);
onepath_subscribe(session, &sub, "test/pong", ping_pong_cb, NULL);
for (int i = 0; i < rounds; i++) {
clock_gettime(CLOCK_MONOTONIC, &start);
onepath_publisher_put(pub, data, payload_size);
/* 等待 pong 回声唤醒 */
clock_gettime(CLOCK_MONOTONIC, &end);
latencies[i] = rtt_us(start, end);
}./examples/build/release/full/onepath_ping_pong pong &
./examples/build/release/full/onepath_ping_pong ping
./examples/build/release/full/onepath_ping_pong ping -n 1000 -s 64[INFO] PING payload: 64 bytes, rounds: 50 (warmup: 5)
[INFO] === Ping/Pong latency stats (50 rounds, 64 bytes payload) ===
min: 41.7 us
avg: 44.8 us
median: 43.3 us
p99: 68.3 us
max: 68.3 us变体:双后端。tiny 后端 RTT 更低(约 avg 21.9 us)。
延迟自适应
sender 先用 ping 探针测 RTT,按链路质量(excellent / medium / poor)自适应选择发布 QoS(优先级、拥塞控制、express、发布间隔)后持续发布;monitor 回声 ping 并打印数据及其 QoS 标签。
关键 OnePath API
onepath_declare_publisher+onepath_pub_opts_t(congestion_control/priority/express/reliability)— 按 QoS 档位声明发布器- 常量
ONEPATH_PRIORITY_REALTIME/DATA/BACKGROUND、ONEPATH_CC_DROP/BLOCK、ONEPATH_RELIABILITY_RELIABLE - 订阅样本字段
sample->priority/congestion_control/express
onepath_pub_opts_t pub_opts = {
.congestion_control = profile->congestion_control,
.priority = profile->priority,
.express = profile->express,
.reliability = ONEPATH_RELIABILITY_RELIABLE,
};
onepath_declare_publisher(session, &data_pub, keyexpr, &pub_opts);
snprintf(buf, sizeof(buf), "[%d] link: %s, RTT: %.1fms", msg_idx++, profile->label, avg_rtt);
onepath_publisher_put_str(data_pub, buf);
onepath_sleep_ms(profile->publish_interval_ms);./examples/build/release/full/onepath_latency_adapt monitor &
./examples/build/release/full/onepath_latency_adapt sender
./examples/build/release/full/onepath_latency_adapt sender --probes 20 --interval 60[INFO] === RTT statistics (10 probes) ===
min: 0.037 ms
link: excellent (threshold: 10.0 ms)
QoS: priority=1, congestion=DROP, express=yes, interval=100 ms
[INFO] pub: [0] link: excellent, RTT: 0.1ms
[INFO] data: key: 'demo/adapt/data' | value: '[0] link: excellent, RTT: 0.1ms' | priority=1 congestion=DROP express=yes变体:双后端。
共享内存 Pub/Sub
仅 full。
同主机进程间共享内存零拷贝传输:发布端在 SHM 缓冲池分配内存写入 payload 并发布句柄,订阅端无需拷贝即可直接读取。订阅端回调照常收 onepath_sample_t——SHM 检测对用户透明。
关键 OnePath API
onepath_shm_pool_create/onepath_shm_pool_destroy— 创建/销毁 SHM 缓冲池onepath_publisher_put_shm(pub, pool, data, len)— 经 SHM 池零拷贝发布(分配失败自动回退普通拷贝)onepath_subscribe— 订阅端照常收样本,无需感知 SHM
/* pub */
onepath_shm_pool_create(session, &pool, SHM_POOL_SIZE);
onepath_declare_publisher(session, &pub, keyexpr, NULL);
onepath_publisher_put_shm(pub, pool, msg, n); /* 零拷贝 */
/* sub:on_sample 内直接读 sample->data / sample->data_len */
onepath_subscribe(session, &sub, "demo/shm/**", on_sample, NULL);./examples/build/release/full/onepath_shm_sub &
./examples/build/release/full/onepath_shm_pub[INFO] SHM pool created (pool size: 1048576 bytes)
[INFO] [SHM PUB] [0] Hello from onepath_shm_pub (zero-copy!)
...
[ OK ] session opened
[INFO] [RECV] key: 'demo/shm/hello', 43 bytes, value: '[0] Hello from onepath_shm_pub (zero-copy!)'变体:仅 full。
SHM 请求/应答
仅 full。
请求/应答模式:responder 声明应答端、回调中回复预设 value;get 端发起一次查询并 FIFO 接收所有回复。SHM 优化对用户透明。
关键 OnePath API
onepath_declare_responder/onepath_request_reply/onepath_request_release— 应答端onepath_get+onepath_get_opts_t(payload/payload_len)— 发起查询onepath_reply_recv/onepath_reply_release/onepath_reply_rx_destroy— 接收回复
/* responder */
onepath_declare_responder(session, &resp, keyexpr, on_query, &ctx, 1);
/* on_query: */ onepath_request_reply(query, ctx->keyexpr, ctx->value, value_len);
onepath_request_release(query);
/* get */
onepath_get_opts_t opts = ONEPATH_GET_OPTS_DEFAULT;
opts.payload = value; opts.payload_len = strlen(value);
onepath_get(session, keyexpr, "", &rx, &opts);
while (onepath_reply_recv(rx, &reply) == ONEPATH_OK) { /* reply.key / reply.data */
onepath_reply_release(&reply);
}./examples/build/release/full/onepath_shm_request responder &
./examples/build/release/full/onepath_shm_request get[INFO] Responder ready on 'demo/shm/query' (Ctrl+C to exit) ...
[INFO] [QUERY] 'demo/shm/query?' payload='SHM responder response'
[ OK ] >> reply: 'demo/shm/query' = 'SHM responder response'
[ OK ] 1 replies received变体:仅 full。
SHM 延迟测试
仅 full。
测量同主机进程间经共享内存通信的往返延迟:pong 常驻回声、ping 用 SHM 池发布并以条件变量保证「一次在途一个 ping」,输出 min/avg/median/P99/max。
关键 OnePath API
onepath_shm_pool_create/onepath_publisher_put_shm— SHM 池 + 零拷贝发布 pingonepath_publisher_put— pong 回声onepath_subscribe/onepath_sample_release
onepath_shm_pool_create(session, &pool, shm_pool_size);
for (int i = 0; i < rounds; i++) {
clock_gettime(CLOCK_MONOTONIC, &ts_start);
onepath_publisher_put_shm(pub, pool, data, payload_size);
/* 条件变量等 pong 回声 */
clock_gettime(CLOCK_MONOTONIC, &ts_end);
latencies[i] = us(ts_start, ts_end);
}./examples/build/release/full/onepath_shm_ping pong &
./examples/build/release/full/onepath_shm_ping ping 64
./examples/build/release/full/onepath_shm_ping ping 1024 -n 200[INFO] [SHM PING] payload: 64 bytes, rounds: 100 (warmup: 5)
[INFO] === SHM Ping/Pong latency stats (100 rounds, 64 bytes payload) ===
min: 40.6 us
avg: 46.0 us
median: 42.0 us
P99: 97.9 us
max: 97.9 us变体:仅 full。
吞吐量基准
仅 full。
吞吐上限基准:sub 后台收消息并逐轮统计速率,pub/pub-shm 端在紧凑循环里高速发布定长 payload,对比「普通内存拷贝」与「SHM 零拷贝」两种发布路径。
关键 OnePath API
onepath_config_enable_shm— 在 SHM 发布会话上启用共享内存onepath_declare_publisher+onepath_pub_opts_t{ .congestion_control = ONEPATH_CC_BLOCK }— 背压阻塞而非丢包onepath_publisher_put/onepath_publisher_put_shm— 两种发布路径onepath_subscribe_background— 后台订阅(无显式句柄),回调统计速率
/* sub */
onepath_subscribe_background(session, KEYEXPR, on_sample, stats);
/* pub-shm */
onepath_config_enable_shm(cfg, NULL);
onepath_pub_opts_t opts = { .congestion_control = ONEPATH_CC_BLOCK };
onepath_declare_publisher(session, &pub, KEYEXPR, &opts);
onepath_shm_pool_create(session, &pool, shm_pool_size);
while (1) onepath_publisher_put_shm(pub, pool, value, size);./examples/build/release/full/onepath_throughput sub --messages 100000 --rounds 5
./examples/build/release/full/onepath_throughput pub 1024
./examples/build/release/full/onepath_throughput pub-shm 1024[INFO] [SUB] throughput test: 5000 msgs/round, 3 rounds
[round 1] 1279024 msg/s (3.9 ms / 5000 msgs)
[INFO] === throughput summary ===
total messages: 15000
average rate: 1154980 msg/s变体:仅 full。
SHM 传输基准
仅 full。
单进程双会话 ping/pong,对比共享内存传输与 socket 传输在空载(none)与 iperf3 打满 loopback 两种负载下的 RTT,检验「SHM 绕过网络栈、不受 loopback 拥塞影响」。iperf3 由程序自动拉起,未装则对应行降级为 N/A。
关键 OnePath API
onepath_config_enable_shm— 选 SHM 传输onepath_config_set_json5(cfg, path, value)— 高级配置:设监听端点、或"transport/shared_memory/enabled"="false"强制走 socketonepath_subscribe_background/onepath_publisher_put— ping/pong 回声与计时
static void apply_transport(onepath_config_t cfg, int shm) {
onepath_config_set_mode(cfg, "peer");
if (shm) onepath_config_enable_shm(cfg, NULL);
else onepath_config_set_json5(cfg, "transport/shared_memory/enabled", "false");
}
onepath_config_set_json5(cfg_sub, "listen/endpoints", "[\"tcp/127.0.0.1:7800\"]");
onepath_subscribe_background(sub_s, KEY_PING, on_ping, pong_pub);
onepath_publisher_put(ping_pub, buf, size); /* 计时 + condvar 等 pong */./examples/build/release/full/onepath_shm_bench
./examples/build/release/full/onepath_shm_bench --sizes 64,256 --rounds 2000[INFO] ping/pong latency: SHM vs socket, none vs iperf3, 2 size(s)
MsgSize Transport Stress min(us) avg(us) p50(us) p99(us) max(us)
64 B SHM none 37.1 40.3 38.9 95.2 141.7
64 B socket none 30.5 42.5 40.8 125.2 179.0变体:仅 full。SHM 在 loopback 被压满时 P99 显著优于 socket(不经网络栈)。详见 SHM 场景。