发布与订阅
发布/订阅(Pub/Sub)是 OnePath 的核心通信范式:发布者把数据写到某个 key expression,订阅者按 key(支持通配符)接收。本页覆盖声明式发布者、一次性发布、删除操作,以及三种订阅模式(回调 / 后台 / 拉取)与样本接收释放。
发布
onepath_declare_publisher
c
int onepath_declare_publisher(onepath_session_t s, onepath_publisher_t *out,
const char *key, const onepath_pub_opts_t *opts);声明一个发布者,适用于频繁发布的场景。
- 参数:
s— 会话句柄out— 输出发布者句柄key— key expression,如"demo/sensor/temp"opts— 发布选项,传NULL使用默认值(见 onepath_pub_opts_t)
- 返回值:
ONEPATH_OK成功 - 注意:使用完毕后必须调用
onepath_publisher_destroy()销毁
onepath_publisher_put
c
int onepath_publisher_put(onepath_publisher_t pub, const void *data, size_t len);通过发布者发送二进制数据。
- 参数:
pub— 发布者句柄;data— 数据指针;len— 数据长度(字节) - 返回值:
ONEPATH_OK成功
onepath_publisher_put_str
c
int onepath_publisher_put_str(onepath_publisher_t pub, const char *str);通过发布者发送字符串。
- 参数:
pub— 发布者句柄;str— 以 null 结尾的字符串 - 返回值:
ONEPATH_OK成功
onepath_publisher_write
c
int onepath_publisher_write(onepath_publisher_t pub, const void *data, size_t len,
const onepath_write_opts_t *opts);带扩展选项的发布,支持自定义编码和附件。
- 参数:
pub— 发布者句柄data— 数据指针;len— 数据长度opts— 写入选项,传NULL使用默认值(见 onepath_write_opts_t)
- 返回值:
ONEPATH_OK成功
onepath_publisher_destroy
c
void onepath_publisher_destroy(onepath_publisher_t pub);销毁发布者并释放资源。
- 参数:
pub— 发布者句柄
一次性发布(无需声明)
适用于偶尔发布的场景;频繁发布建议使用 onepath_declare_publisher()。
onepath_put
c
int onepath_put(onepath_session_t s, const char *key,
const void *data, size_t len, const char *encoding);一次性发布二进制数据。
- 参数:
s— 会话句柄;key— key expression;data/len— 数据;encoding— 编码字符串,传NULL使用默认编码 - 返回值:
ONEPATH_OK成功
onepath_put_str
c
int onepath_put_str(onepath_session_t s, const char *key, const char *value);一次性发布字符串。
- 参数:
s— 会话句柄;key— key expression;value— 以 null 结尾的字符串 - 返回值:
ONEPATH_OK成功
onepath_put_with_opts
c
int onepath_put_with_opts(onepath_session_t s, const char *key,
const void *data, size_t len,
const onepath_write_opts_t *opts);带扩展选项的一次性发布。
- 参数:
s— 会话句柄;key— key expression;data/len— 数据;opts— 写入选项,传NULL使用默认值 - 返回值:
ONEPATH_OK成功
onepath_delete
c
int onepath_delete(onepath_session_t s, const char *key);发送删除操作。
- 参数:
s— 会话句柄;key— 要删除的 key expression - 返回值:
ONEPATH_OK成功 - 注意:订阅者会收到
kind == ONEPATH_SAMPLE_DELETE的样本
订阅
OnePath 提供三种订阅模式:回调模式(数据到达即触发回调)、后台模式(生命周期随会话)、拉取模式(写入环形缓冲,由用户主动取)。
onepath_subscribe — 回调模式
c
int onepath_subscribe(onepath_session_t s, onepath_subscriber_t *out,
const char *key, onepath_sample_cb cb, void *userdata);创建回调模式的订阅者。
- 参数:
s— 会话句柄out— 输出订阅者句柄key— key expression,支持通配符(如"demo/**")cb— 数据到达时的回调函数(见 回调函数)userdata— 传递给回调的用户数据指针
- 返回值:
ONEPATH_OK成功 - 注意:回调在内部线程中触发;使用完毕后调用
onepath_subscriber_destroy()
onepath_subscribe_background — 后台模式
c
int onepath_subscribe_background(onepath_session_t s, const char *key,
onepath_sample_cb cb, void *userdata);创建后台订阅者,生命周期与会话绑定。
- 参数:
s— 会话句柄;key— key expression;cb— 回调函数;userdata— 用户数据指针 - 返回值:
ONEPATH_OK成功 - 注意:不返回订阅者句柄,订阅在会话关闭时自动终止,无需手动销毁
onepath_subscribe_pull — 拉取模式
c
int onepath_subscribe_pull(onepath_session_t s, onepath_subscriber_t *out,
onepath_sample_rx_t *rx, const char *key,
size_t ring_size);创建拉取模式的订阅者(环形缓冲区)。
- 参数:
s— 会话句柄out— 输出订阅者句柄rx— 输出样本接收通道句柄key— key expressionring_size— 环形缓冲区大小
- 返回值:
ONEPATH_OK成功 - 注意:当缓冲区满时,最旧的数据会被覆盖
onepath_sample_recv
c
int onepath_sample_recv(onepath_sample_rx_t rx, onepath_sample_t *out);从拉取订阅通道阻塞接收一个样本。
- 参数:
rx— 样本接收通道句柄;out— 输出样本数据 - 返回值:
ONEPATH_OK成功,ONEPATH_ERR_CLOSED通道已关闭 - 注意:收到的样本使用完毕后必须调用
onepath_sample_release()释放
onepath_sample_try_recv
c
int onepath_sample_try_recv(onepath_sample_rx_t rx, onepath_sample_t *out);从拉取订阅通道非阻塞接收一个样本。
- 参数:
rx— 样本接收通道句柄;out— 输出样本数据 - 返回值:
ONEPATH_OK成功,ONEPATH_ERR_TIMEOUT无可用数据,ONEPATH_ERR_CLOSED通道已关闭 - 注意:收到的样本使用完毕后必须调用
onepath_sample_release()释放
onepath_sample_release
c
void onepath_sample_release(onepath_sample_t *sample);释放样本数据。无论通过回调还是拉取通道接收,用户拿到的样本均须调用此函数释放。
- 参数:
sample— 样本指针,释放后清零 - 注意:回调模式和拉取模式均须调用
onepath_sample_rx_destroy
c
void onepath_sample_rx_destroy(onepath_sample_rx_t rx);销毁样本接收通道。
- 参数:
rx— 样本接收通道句柄
onepath_subscriber_destroy
c
void onepath_subscriber_destroy(onepath_subscriber_t sub);销毁订阅者。
- 参数:
sub— 订阅者句柄 - 注意:对于拉取模式订阅,还需单独调用
onepath_sample_rx_destroy()销毁接收通道
示例
声明式发布
c
onepath_publisher_t pub;
onepath_declare_publisher(s, &pub, "demo/sensor/temp", NULL);
for (int i = 0; i < 10; i++) {
char buf[64];
snprintf(buf, sizeof(buf), "temp=%d", 20 + i);
onepath_publisher_put_str(pub, buf);
onepath_sleep_s(1);
}
onepath_publisher_destroy(pub);回调订阅
c
static void on_sample(onepath_sample_t *sample, void *userdata) {
(void)userdata;
printf("收到 [%s]: %.*s\n",
sample->key, (int)sample->data_len, (const char *)sample->data);
onepath_sample_release(sample); /* 必须释放 */
}
onepath_subscriber_t sub;
onepath_subscribe(s, &sub, "demo/**", on_sample, NULL);
/* ... 运行 ... */
onepath_subscriber_destroy(sub);拉取订阅
c
onepath_subscriber_t sub;
onepath_sample_rx_t rx;
onepath_subscribe_pull(s, &sub, &rx, "demo/**", 256);
onepath_sample_t sample;
while (onepath_sample_recv(rx, &sample) == ONEPATH_OK) {
printf("%.*s\n", (int)sample.data_len, (const char *)sample.data);
onepath_sample_release(&sample);
}
onepath_subscriber_destroy(sub);
onepath_sample_rx_destroy(rx);