高级发布订阅
可用性:完整版(Full)专属。精简版(Tiny)不提供高级发布订阅能力,相关声明在 头文件中即被剔除,误用在编译期即报错——这是一条明确的能力边界,而非运行期失败。
概述
高级发布订阅在普通发布 / 订阅之上提供缓存、丢失检测与历史恢复,适用于对数据完整性 要求较高的场景(晚加入的订阅者需要补齐历史、链路抖动需要自动重传丢失样本等)。
- 发布端缓存:发布者保留最近若干条样本,供晚加入的订阅者获取。
- 历史回放:订阅者加入时可一次性拉取发布端缓存的历史样本。
- 丢失检测与恢复:基于心跳检测样本丢失,并可自动恢复丢失的样本;丢失事件还可经 回调通知应用。
发布者与订阅者的句柄类型与普通版本相同,但需用对应的高级 API 发布 / 订阅;销毁时仍用 onepath_publisher_destroy / onepath_subscriber_destroy。
快速上手
c
/* 高级发布者:默认开启缓存(最多 10 条)+ 丢失检测 + 存在广播 */
onepath_publisher_t pub;
onepath_declare_advanced_publisher(s, &pub, "demo/adv/data", NULL);
onepath_advanced_publisher_put(pub, data, len);
/* 高级订阅者:加入时拉取历史 + 检测后续发布者 + 自动恢复 */
onepath_subscriber_t sub;
onepath_subscribe_advanced(s, &sub, "demo/adv/data", on_sample, NULL, NULL);
/* 可选:注册丢失通知回调 */
onepath_subscriber_on_miss(sub, on_miss, NULL);API 参考
发布
c
int onepath_declare_advanced_publisher(onepath_session_t s, onepath_publisher_t *out,
const char *key,
const onepath_advanced_pub_opts_t *opts);
int onepath_advanced_publisher_put(onepath_publisher_t pub, const void *data, size_t len);
int onepath_advanced_publisher_put_str(onepath_publisher_t pub, const char *str);| 函数 | 说明 |
|---|---|
onepath_declare_advanced_publisher(s, &out, key, opts) | 声明高级发布者;opts=NULL 取默认。销毁用 onepath_publisher_destroy。 |
onepath_advanced_publisher_put(pub, data, len) | 经高级发布者发送二进制数据。 |
onepath_advanced_publisher_put_str(pub, str) | 经高级发布者发送以 null 结尾的字符串。 |
订阅
c
int onepath_subscribe_advanced(onepath_session_t s, onepath_subscriber_t *out,
const char *key, onepath_sample_cb cb, void *userdata,
const onepath_advanced_sub_opts_t *opts);
int onepath_subscriber_on_miss(onepath_subscriber_t sub,
onepath_miss_cb cb, void *userdata);| 函数 | 说明 |
|---|---|
onepath_subscribe_advanced(s, &out, key, cb, userdata, opts) | 创建高级订阅者,支持历史恢复与丢失检测;opts=NULL 取默认。销毁用 onepath_subscriber_destroy。 |
onepath_subscriber_on_miss(sub, cb, userdata) | 为高级订阅者注册数据丢失回调;仅适用于经 onepath_subscribe_advanced 创建的订阅者。 |
选项结构
onepath_advanced_pub_opts_t — 高级发布者选项
c
typedef struct {
int cache_enabled; /* 是否启用发布端缓存 */
size_t cache_max_samples; /* 最大缓存样本数 (0 = 不限) */
int miss_detection_enabled; /* 是否启用基于心跳的丢失检测 */
int publisher_detection; /* 是否通过存活感知广播存在 */
} onepath_advanced_pub_opts_t;
#define ONEPATH_ADVANCED_PUB_OPTS_DEFAULT { 1, 10, 1, 1 }默认值:启用缓存、缓存最多 10 条、启用丢失检测、广播存在。
onepath_advanced_sub_opts_t — 高级订阅者选项
c
typedef struct {
int history_enabled; /* 加入时是否获取缓存历史 */
int detect_late_publishers; /* 是否检测后续出现的发布者 */
int recovery_enabled; /* 是否自动恢复丢失的样本 */
} onepath_advanced_sub_opts_t;
#define ONEPATH_ADVANCED_SUB_OPTS_DEFAULT { 1, 1, 1 }默认值:加入时拉取历史、检测后续发布者、自动恢复丢失样本。
创建 / 销毁配对
| 创建 | 销毁 |
|---|---|
onepath_declare_advanced_publisher | onepath_publisher_destroy |
onepath_subscribe_advanced | onepath_subscriber_destroy |
TIP
丢失检测与恢复依赖发布端缓存与心跳。若需要可靠的「不丢消息」语义,发布端应保持 cache_enabled=1 并设置足够的 cache_max_samples,订阅端保持 recovery_enabled=1。 样本回调收到的数据已深拷贝、归用户所有,用完须 onepath_sample_release,见 内存管理。