Skip to content

查询与存储

本章演示 OnePath 的请求/应答分布式存储能力:客户端主动发起 Get 查询、构建内存存储节点应答查询、以及跨重连保持上下文的持久请求器。这三者常组合使用——存储节点应答查询,命令行用 一次性读写 写入数据。

Get 查询

客户端主动向存储/应答节点发起一次性查询(类似 HTTP GET),通过回复通道同步阻塞接收并逐条打印命中的 key 与 value。

关键 OnePath API

  • onepath_get(session, keyexpr, selector, &rx, &opts) — 向指定键表达式发起查询,返回回复接收句柄
  • onepath_reply_recv(rx, &reply) — 阻塞接收一条查询回复(reply.is_ok / reply.key / reply.data
  • onepath_reply_release(&reply) / onepath_reply_rx_destroy(rx) — 释放回复与接收句柄
  • onepath_get_opts_ttimeout_ms / payload)— 查询选项
c
onepath_get_opts_t opts = { .timeout_ms = timeout_ms, .payload = NULL, .payload_len = 0 };
onepath_reply_rx_t rx;
onepath_get(session, keyexpr, selector, &rx, &opts);

onepath_reply_t reply;
while (onepath_reply_recv(rx, &reply) == ONEPATH_OK) {
    if (reply.is_ok)
        EP_OK("reply key: '%s', value: '%.*s'", reply.key,
              (int)reply.data_len, (const char*)reply.data);
    onepath_reply_release(&reply);
}
onepath_reply_rx_destroy(rx);
bash
./examples/build/release/full/onepath_storage_demo &
./examples/build/release/full/onepath_put_delete put -k demo/example/hello -v "world"
./examples/build/release/full/onepath_get_demo -k "demo/example/**"
./examples/build/release/full/onepath_get_demo -k "demo/example/**" -t 3000
text
[2026-06-21-17-19-47:265] [INFO] sending query (timeout: 2000 ms) ...
[2026-06-21-17-19-47:265] [INFO] query received: 'demo/example/**'
[2026-06-21-17-19-47:265] [ OK ] query returned 1 matching results
[2026-06-21-17-19-47:265] [ OK ] reply PUT key: 'demo/example/hello', value: 'world'
[2026-06-21-17-19-47:265] [ OK ] query done, received 1 replies.

变体:双后端

分布式存储

实现一个内存存储节点,同时扮演订阅者与应答者:订阅 PUT/DELETE 事件写入哈希表,并对查询返回所有与键表达式相交的已存条目。这是 三模存储便捷 API 的「手写版」对照——约 500 行手写哈希表,便捷 API 仅需一次 onepath_store_open

关键 OnePath API

  • onepath_subscribe(session, &sub, keyexpr, on_sample, NULL) — 订阅 PUT/DELETE 写入哈希表
  • onepath_declare_responder(session, &resp, keyexpr, on_query, NULL, 1) — 声明应答者处理查询
  • onepath_keyexpr_intersects(query->key, stored_key) — 判断存储的 key 是否与查询表达式相交
  • onepath_request_reply_str(query, key, value) — 以字符串 key/value 回复一条结果
  • onepath_sample_release / onepath_request_release — 释放回调收到的样本/请求
c
onepath_subscribe(session, &sub, keyexpr, on_sample, NULL);          /* PUT/DELETE -> 哈希表 */
onepath_declare_responder(session, &resp, keyexpr, on_query, NULL, 1);

/* on_query 回调内: */
if (onepath_keyexpr_intersects(query->key, node->key))
    onepath_request_reply_str(query, node->key, node->value);
onepath_request_release(query);

/* on_sample 回调内: */
if (sample->kind == ONEPATH_SAMPLE_PUT) storage_put(...);
onepath_sample_release(sample);
bash
./examples/build/release/full/onepath_storage_demo
# 配合:另一终端用 put_delete 写入、get_demo 查询
./examples/build/release/full/onepath_put_delete put -k demo/example/hello -v "world"
./examples/build/release/full/onepath_get_demo -k "demo/example/**"
text
[2026-06-21-17-19-42:755] [ OK ] subscriber ready, listening on 'demo/example/**'
[2026-06-21-17-19-42:755] [ OK ] responder ready, answering queries on 'demo/example/**'
[2026-06-21-17-19-46:259] [INFO] storage PUT key: 'demo/example/hello', value: 'world'
[2026-06-21-17-19-47:265] [INFO] query received: 'demo/example/**'
[2026-06-21-17-19-47:265] [ OK ] query returned 1 matching results

变体:双后端

持久请求器

持久请求器跨重连保持查询上下文,并通过「有应答者才查询」的匹配监听,支持阻塞接收与非阻塞轮询两种回复获取模式(适配实时控制回路)。

关键 OnePath API

  • onepath_declare_requester(session, &requester, keyexpr, &opts) — 声明跨重连的持久请求器
  • onepath_requester_on_matching(requester, on_matching_status, NULL) — 注册匹配监听,通知是否存在应答者
  • onepath_requester_get(requester, NULL, &rx, payload, len) — 用持久请求器发起一次查询
  • onepath_reply_recv — 阻塞接收(requester 模式)
  • onepath_reply_try_recv — 非阻塞轮询,按 ONEPATH_ERR_CLOSED / ONEPATH_ERR_TIMEOUT / ONEPATH_OK 分支(nb-requester 模式)
c
onepath_requester_opts_t opts = { .timeout_ms = timeout_ms };
onepath_requester_t requester;
onepath_declare_requester(session, &requester, keyexpr, &opts);
onepath_requester_on_matching(requester, on_matching_status, NULL);

onepath_reply_rx_t rx;
onepath_requester_get(requester, NULL, &rx, buf, strlen(buf));

/* 非阻塞轮询: */
int res = onepath_reply_try_recv(rx, &reply);
if (res == ONEPATH_ERR_CLOSED)  break;
if (res == ONEPATH_ERR_TIMEOUT) { onepath_sleep_ms(50); continue; }
if (res == ONEPATH_OK)          { /* use reply */ onepath_reply_release(&reply); }
onepath_reply_rx_destroy(rx);
bash
./examples/build/release/full/onepath_storage_demo &
./examples/build/release/full/onepath_put_delete put -k demo/example/test -v "hello"
./examples/build/release/full/onepath_requester_demo requester
./examples/build/release/full/onepath_requester_demo nb-requester
text
[2026-06-21-17-20-20:316] [ OK ] declared persistent requester: 'demo/example/**' (timeout: 2000 ms)
[2026-06-21-17-20-20:317] [INFO] [Matching] found a matching responder
[2026-06-21-17-20-22:316] [INFO] query #0 'demo/example/**' payload='[0] query from onepath_requester_demo'
[2026-06-21-17-20-22:317] [ OK ]   >> reply: 'demo/example/test' = 'hello'
[2026-06-21-17-20-22:317] [INFO]   total 1 replies

变体:双后端。requester 与 nb-requester 两种模式均支持。

OnePath™ 以预构建库形式交付,运行时零外部依赖。