查询与存储
本章演示 OnePath 的请求/应答与分布式存储能力:客户端主动发起 Get 查询、构建内存存储节点应答查询、以及跨重连保持上下文的持久请求器。这三者常组合使用——存储节点应答查询,命令行用 一次性读写 写入数据。
Get 查询
客户端主动向存储/应答节点发起一次性查询(类似 HTTP GET),通过回复通道同步阻塞接收并逐条打印命中的 key 与 value。
关键 OnePath API
onepath_get(session, keyexpr, selector, &rx, &opts)— 向指定键表达式发起查询,返回回复接收句柄onepath_reply_recv(rx, &reply)— 阻塞接收一条查询回复(reply.is_ok/reply.key/reply.data)onepath_reply_release(&reply)/onepath_reply_rx_destroy(rx)— 释放回复与接收句柄onepath_get_opts_t(timeout_ms/payload)— 查询选项
c
onepath_get_opts_t opts = { .timeout_ms = timeout_ms, .payload = NULL, .payload_len = 0 };
onepath_reply_rx_t rx;
onepath_get(session, keyexpr, selector, &rx, &opts);
onepath_reply_t reply;
while (onepath_reply_recv(rx, &reply) == ONEPATH_OK) {
if (reply.is_ok)
EP_OK("reply key: '%s', value: '%.*s'", reply.key,
(int)reply.data_len, (const char*)reply.data);
onepath_reply_release(&reply);
}
onepath_reply_rx_destroy(rx);bash
./examples/build/release/full/onepath_storage_demo &
./examples/build/release/full/onepath_put_delete put -k demo/example/hello -v "world"
./examples/build/release/full/onepath_get_demo -k "demo/example/**"
./examples/build/release/full/onepath_get_demo -k "demo/example/**" -t 3000text
[2026-06-21-17-19-47:265] [INFO] sending query (timeout: 2000 ms) ...
[2026-06-21-17-19-47:265] [INFO] query received: 'demo/example/**'
[2026-06-21-17-19-47:265] [ OK ] query returned 1 matching results
[2026-06-21-17-19-47:265] [ OK ] reply PUT key: 'demo/example/hello', value: 'world'
[2026-06-21-17-19-47:265] [ OK ] query done, received 1 replies.变体:双后端。
分布式存储
实现一个内存存储节点,同时扮演订阅者与应答者:订阅 PUT/DELETE 事件写入哈希表,并对查询返回所有与键表达式相交的已存条目。这是 三模存储便捷 API 的「手写版」对照——约 500 行手写哈希表,便捷 API 仅需一次 onepath_store_open。
关键 OnePath API
onepath_subscribe(session, &sub, keyexpr, on_sample, NULL)— 订阅 PUT/DELETE 写入哈希表onepath_declare_responder(session, &resp, keyexpr, on_query, NULL, 1)— 声明应答者处理查询onepath_keyexpr_intersects(query->key, stored_key)— 判断存储的 key 是否与查询表达式相交onepath_request_reply_str(query, key, value)— 以字符串 key/value 回复一条结果onepath_sample_release/onepath_request_release— 释放回调收到的样本/请求
c
onepath_subscribe(session, &sub, keyexpr, on_sample, NULL); /* PUT/DELETE -> 哈希表 */
onepath_declare_responder(session, &resp, keyexpr, on_query, NULL, 1);
/* on_query 回调内: */
if (onepath_keyexpr_intersects(query->key, node->key))
onepath_request_reply_str(query, node->key, node->value);
onepath_request_release(query);
/* on_sample 回调内: */
if (sample->kind == ONEPATH_SAMPLE_PUT) storage_put(...);
onepath_sample_release(sample);bash
./examples/build/release/full/onepath_storage_demo
# 配合:另一终端用 put_delete 写入、get_demo 查询
./examples/build/release/full/onepath_put_delete put -k demo/example/hello -v "world"
./examples/build/release/full/onepath_get_demo -k "demo/example/**"text
[2026-06-21-17-19-42:755] [ OK ] subscriber ready, listening on 'demo/example/**'
[2026-06-21-17-19-42:755] [ OK ] responder ready, answering queries on 'demo/example/**'
[2026-06-21-17-19-46:259] [INFO] storage PUT key: 'demo/example/hello', value: 'world'
[2026-06-21-17-19-47:265] [INFO] query received: 'demo/example/**'
[2026-06-21-17-19-47:265] [ OK ] query returned 1 matching results变体:双后端。
持久请求器
持久请求器跨重连保持查询上下文,并通过「有应答者才查询」的匹配监听,支持阻塞接收与非阻塞轮询两种回复获取模式(适配实时控制回路)。
关键 OnePath API
onepath_declare_requester(session, &requester, keyexpr, &opts)— 声明跨重连的持久请求器onepath_requester_on_matching(requester, on_matching_status, NULL)— 注册匹配监听,通知是否存在应答者onepath_requester_get(requester, NULL, &rx, payload, len)— 用持久请求器发起一次查询onepath_reply_recv— 阻塞接收(requester 模式)onepath_reply_try_recv— 非阻塞轮询,按ONEPATH_ERR_CLOSED/ONEPATH_ERR_TIMEOUT/ONEPATH_OK分支(nb-requester 模式)
c
onepath_requester_opts_t opts = { .timeout_ms = timeout_ms };
onepath_requester_t requester;
onepath_declare_requester(session, &requester, keyexpr, &opts);
onepath_requester_on_matching(requester, on_matching_status, NULL);
onepath_reply_rx_t rx;
onepath_requester_get(requester, NULL, &rx, buf, strlen(buf));
/* 非阻塞轮询: */
int res = onepath_reply_try_recv(rx, &reply);
if (res == ONEPATH_ERR_CLOSED) break;
if (res == ONEPATH_ERR_TIMEOUT) { onepath_sleep_ms(50); continue; }
if (res == ONEPATH_OK) { /* use reply */ onepath_reply_release(&reply); }
onepath_reply_rx_destroy(rx);bash
./examples/build/release/full/onepath_storage_demo &
./examples/build/release/full/onepath_put_delete put -k demo/example/test -v "hello"
./examples/build/release/full/onepath_requester_demo requester
./examples/build/release/full/onepath_requester_demo nb-requestertext
[2026-06-21-17-20-20:316] [ OK ] declared persistent requester: 'demo/example/**' (timeout: 2000 ms)
[2026-06-21-17-20-20:317] [INFO] [Matching] found a matching responder
[2026-06-21-17-20-22:316] [INFO] query #0 'demo/example/**' payload='[0] query from onepath_requester_demo'
[2026-06-21-17-20-22:317] [ OK ] >> reply: 'demo/example/test' = 'hello'
[2026-06-21-17-20-22:317] [INFO] total 1 replies变体:双后端。requester 与 nb-requester 两种模式均支持。