Skip to content

发布与订阅

发布/订阅(Pub/Sub)是 OnePath 的核心通信范式:发布者把数据写到某个 key expression,订阅者按 key(支持通配符)接收。本页覆盖声明式发布者、一次性发布、删除操作,以及三种订阅模式(回调 / 后台 / 拉取)与样本接收释放。

发布

onepath_declare_publisher

c
int onepath_declare_publisher(onepath_session_t s, onepath_publisher_t *out,
                            const char *key, const onepath_pub_opts_t *opts);

声明一个发布者,适用于频繁发布的场景。

  • 参数
    • s — 会话句柄
    • out — 输出发布者句柄
    • key — key expression,如 "demo/sensor/temp"
    • opts — 发布选项,传 NULL 使用默认值(见 onepath_pub_opts_t
  • 返回值ONEPATH_OK 成功
  • 注意:使用完毕后必须调用 onepath_publisher_destroy() 销毁

onepath_publisher_put

c
int onepath_publisher_put(onepath_publisher_t pub, const void *data, size_t len);

通过发布者发送二进制数据。

  • 参数pub — 发布者句柄;data — 数据指针;len — 数据长度(字节)
  • 返回值ONEPATH_OK 成功

onepath_publisher_put_str

c
int onepath_publisher_put_str(onepath_publisher_t pub, const char *str);

通过发布者发送字符串。

  • 参数pub — 发布者句柄;str — 以 null 结尾的字符串
  • 返回值ONEPATH_OK 成功

onepath_publisher_write

c
int onepath_publisher_write(onepath_publisher_t pub, const void *data, size_t len,
                          const onepath_write_opts_t *opts);

带扩展选项的发布,支持自定义编码和附件。

  • 参数
    • pub — 发布者句柄
    • data — 数据指针;len — 数据长度
    • opts — 写入选项,传 NULL 使用默认值(见 onepath_write_opts_t
  • 返回值ONEPATH_OK 成功

onepath_publisher_destroy

c
void onepath_publisher_destroy(onepath_publisher_t pub);

销毁发布者并释放资源。

  • 参数pub — 发布者句柄

一次性发布(无需声明)

适用于偶尔发布的场景;频繁发布建议使用 onepath_declare_publisher()

onepath_put

c
int onepath_put(onepath_session_t s, const char *key,
              const void *data, size_t len, const char *encoding);

一次性发布二进制数据。

  • 参数s — 会话句柄;key — key expression;data / len — 数据;encoding — 编码字符串,传 NULL 使用默认编码
  • 返回值ONEPATH_OK 成功

onepath_put_str

c
int onepath_put_str(onepath_session_t s, const char *key, const char *value);

一次性发布字符串。

  • 参数s — 会话句柄;key — key expression;value — 以 null 结尾的字符串
  • 返回值ONEPATH_OK 成功

onepath_put_with_opts

c
int onepath_put_with_opts(onepath_session_t s, const char *key,
                        const void *data, size_t len,
                        const onepath_write_opts_t *opts);

带扩展选项的一次性发布。

  • 参数s — 会话句柄;key — key expression;data / len — 数据;opts — 写入选项,传 NULL 使用默认值
  • 返回值ONEPATH_OK 成功

onepath_delete

c
int onepath_delete(onepath_session_t s, const char *key);

发送删除操作。

  • 参数s — 会话句柄;key — 要删除的 key expression
  • 返回值ONEPATH_OK 成功
  • 注意:订阅者会收到 kind == ONEPATH_SAMPLE_DELETE 的样本

订阅

OnePath 提供三种订阅模式:回调模式(数据到达即触发回调)、后台模式(生命周期随会话)、拉取模式(写入环形缓冲,由用户主动取)。

onepath_subscribe — 回调模式

c
int onepath_subscribe(onepath_session_t s, onepath_subscriber_t *out,
                    const char *key, onepath_sample_cb cb, void *userdata);

创建回调模式的订阅者。

  • 参数
    • s — 会话句柄
    • out — 输出订阅者句柄
    • key — key expression,支持通配符(如 "demo/**"
    • cb — 数据到达时的回调函数(见 回调函数
    • userdata — 传递给回调的用户数据指针
  • 返回值ONEPATH_OK 成功
  • 注意:回调在内部线程中触发;使用完毕后调用 onepath_subscriber_destroy()

onepath_subscribe_background — 后台模式

c
int onepath_subscribe_background(onepath_session_t s, const char *key,
                               onepath_sample_cb cb, void *userdata);

创建后台订阅者,生命周期与会话绑定。

  • 参数s — 会话句柄;key — key expression;cb — 回调函数;userdata — 用户数据指针
  • 返回值ONEPATH_OK 成功
  • 注意:不返回订阅者句柄,订阅在会话关闭时自动终止,无需手动销毁

onepath_subscribe_pull — 拉取模式

c
int onepath_subscribe_pull(onepath_session_t s, onepath_subscriber_t *out,
                         onepath_sample_rx_t *rx, const char *key,
                         size_t ring_size);

创建拉取模式的订阅者(环形缓冲区)。

  • 参数
    • s — 会话句柄
    • out — 输出订阅者句柄
    • rx — 输出样本接收通道句柄
    • key — key expression
    • ring_size — 环形缓冲区大小
  • 返回值ONEPATH_OK 成功
  • 注意:当缓冲区满时,最旧的数据会被覆盖

onepath_sample_recv

c
int onepath_sample_recv(onepath_sample_rx_t rx, onepath_sample_t *out);

从拉取订阅通道阻塞接收一个样本。

  • 参数rx — 样本接收通道句柄;out — 输出样本数据
  • 返回值ONEPATH_OK 成功,ONEPATH_ERR_CLOSED 通道已关闭
  • 注意:收到的样本使用完毕后必须调用 onepath_sample_release() 释放

onepath_sample_try_recv

c
int onepath_sample_try_recv(onepath_sample_rx_t rx, onepath_sample_t *out);

从拉取订阅通道非阻塞接收一个样本。

  • 参数rx — 样本接收通道句柄;out — 输出样本数据
  • 返回值ONEPATH_OK 成功,ONEPATH_ERR_TIMEOUT 无可用数据,ONEPATH_ERR_CLOSED 通道已关闭
  • 注意:收到的样本使用完毕后必须调用 onepath_sample_release() 释放

onepath_sample_release

c
void onepath_sample_release(onepath_sample_t *sample);

释放样本数据。无论通过回调还是拉取通道接收,用户拿到的样本均须调用此函数释放。

  • 参数sample — 样本指针,释放后清零
  • 注意:回调模式和拉取模式均须调用

onepath_sample_rx_destroy

c
void onepath_sample_rx_destroy(onepath_sample_rx_t rx);

销毁样本接收通道。

  • 参数rx — 样本接收通道句柄

onepath_subscriber_destroy

c
void onepath_subscriber_destroy(onepath_subscriber_t sub);

销毁订阅者。

  • 参数sub — 订阅者句柄
  • 注意:对于拉取模式订阅,还需单独调用 onepath_sample_rx_destroy() 销毁接收通道

示例

声明式发布

c
onepath_publisher_t pub;
onepath_declare_publisher(s, &pub, "demo/sensor/temp", NULL);
for (int i = 0; i < 10; i++) {
    char buf[64];
    snprintf(buf, sizeof(buf), "temp=%d", 20 + i);
    onepath_publisher_put_str(pub, buf);
    onepath_sleep_s(1);
}
onepath_publisher_destroy(pub);

回调订阅

c
static void on_sample(onepath_sample_t *sample, void *userdata) {
    (void)userdata;
    printf("收到 [%s]: %.*s\n",
           sample->key, (int)sample->data_len, (const char *)sample->data);
    onepath_sample_release(sample);   /* 必须释放 */
}

onepath_subscriber_t sub;
onepath_subscribe(s, &sub, "demo/**", on_sample, NULL);
/* ... 运行 ... */
onepath_subscriber_destroy(sub);

拉取订阅

c
onepath_subscriber_t sub;
onepath_sample_rx_t   rx;
onepath_subscribe_pull(s, &sub, &rx, "demo/**", 256);

onepath_sample_t sample;
while (onepath_sample_recv(rx, &sample) == ONEPATH_OK) {
    printf("%.*s\n", (int)sample.data_len, (const char *)sample.data);
    onepath_sample_release(&sample);
}

onepath_subscriber_destroy(sub);
onepath_sample_rx_destroy(rx);

零拷贝与高级能力

同机零拷贝发布见 共享内存;缓存、丢失检测、历史恢复见 高级发布订阅

OnePath™ 以预构建库形式交付,运行时零外部依赖。