Skip to content

高级发布订阅

可用性:完整版(Full)专属。精简版(Tiny)不提供高级发布订阅能力,相关声明在 头文件中即被剔除,误用在编译期即报错——这是一条明确的能力边界,而非运行期失败。

概述

高级发布订阅在普通发布 / 订阅之上提供缓存、丢失检测与历史恢复,适用于对数据完整性 要求较高的场景(晚加入的订阅者需要补齐历史、链路抖动需要自动重传丢失样本等)。

  • 发布端缓存:发布者保留最近若干条样本,供晚加入的订阅者获取。
  • 历史回放:订阅者加入时可一次性拉取发布端缓存的历史样本。
  • 丢失检测与恢复:基于心跳检测样本丢失,并可自动恢复丢失的样本;丢失事件还可经 回调通知应用。

发布者与订阅者的句柄类型与普通版本相同,但需用对应的高级 API 发布 / 订阅;销毁时仍用 onepath_publisher_destroy / onepath_subscriber_destroy

快速上手

c
/* 高级发布者:默认开启缓存(最多 10 条)+ 丢失检测 + 存在广播 */
onepath_publisher_t pub;
onepath_declare_advanced_publisher(s, &pub, "demo/adv/data", NULL);
onepath_advanced_publisher_put(pub, data, len);

/* 高级订阅者:加入时拉取历史 + 检测后续发布者 + 自动恢复 */
onepath_subscriber_t sub;
onepath_subscribe_advanced(s, &sub, "demo/adv/data", on_sample, NULL, NULL);

/* 可选:注册丢失通知回调 */
onepath_subscriber_on_miss(sub, on_miss, NULL);

API 参考

发布

c
int onepath_declare_advanced_publisher(onepath_session_t s, onepath_publisher_t *out,
                                       const char *key,
                                       const onepath_advanced_pub_opts_t *opts);
int onepath_advanced_publisher_put(onepath_publisher_t pub, const void *data, size_t len);
int onepath_advanced_publisher_put_str(onepath_publisher_t pub, const char *str);
函数说明
onepath_declare_advanced_publisher(s, &out, key, opts)声明高级发布者;opts=NULL 取默认。销毁用 onepath_publisher_destroy
onepath_advanced_publisher_put(pub, data, len)经高级发布者发送二进制数据。
onepath_advanced_publisher_put_str(pub, str)经高级发布者发送以 null 结尾的字符串。

订阅

c
int onepath_subscribe_advanced(onepath_session_t s, onepath_subscriber_t *out,
                               const char *key, onepath_sample_cb cb, void *userdata,
                               const onepath_advanced_sub_opts_t *opts);
int onepath_subscriber_on_miss(onepath_subscriber_t sub,
                               onepath_miss_cb cb, void *userdata);
函数说明
onepath_subscribe_advanced(s, &out, key, cb, userdata, opts)创建高级订阅者,支持历史恢复与丢失检测;opts=NULL 取默认。销毁用 onepath_subscriber_destroy
onepath_subscriber_on_miss(sub, cb, userdata)为高级订阅者注册数据丢失回调;仅适用于经 onepath_subscribe_advanced 创建的订阅者。

选项结构

onepath_advanced_pub_opts_t — 高级发布者选项

c
typedef struct {
    int    cache_enabled;           /* 是否启用发布端缓存 */
    size_t cache_max_samples;       /* 最大缓存样本数 (0 = 不限) */
    int    miss_detection_enabled;  /* 是否启用基于心跳的丢失检测 */
    int    publisher_detection;     /* 是否通过存活感知广播存在 */
} onepath_advanced_pub_opts_t;

#define ONEPATH_ADVANCED_PUB_OPTS_DEFAULT { 1, 10, 1, 1 }

默认值:启用缓存、缓存最多 10 条、启用丢失检测、广播存在。

onepath_advanced_sub_opts_t — 高级订阅者选项

c
typedef struct {
    int    history_enabled;         /* 加入时是否获取缓存历史 */
    int    detect_late_publishers;  /* 是否检测后续出现的发布者 */
    int    recovery_enabled;        /* 是否自动恢复丢失的样本 */
} onepath_advanced_sub_opts_t;

#define ONEPATH_ADVANCED_SUB_OPTS_DEFAULT { 1, 1, 1 }

默认值:加入时拉取历史、检测后续发布者、自动恢复丢失样本。

创建 / 销毁配对

创建销毁
onepath_declare_advanced_publisheronepath_publisher_destroy
onepath_subscribe_advancedonepath_subscriber_destroy

TIP

丢失检测与恢复依赖发布端缓存与心跳。若需要可靠的「不丢消息」语义,发布端应保持 cache_enabled=1 并设置足够的 cache_max_samples,订阅端保持 recovery_enabled=1。 样本回调收到的数据已深拷贝、归用户所有,用完须 onepath_sample_release,见 内存管理

OnePath™ 以预构建库形式交付,运行时零外部依赖。