存活感知 (Liveliness)
存活感知用于实时感知节点的上线 / 离线状态。一个节点通过声明存活令牌(token)向网络广播"我在某个 key 上存活";其他节点可订阅该 key 的存活变更事件,或一次性查询当前所有活跃令牌。令牌撤销或持有者掉线时,订阅者会收到删除通知。
onepath_liveliness_declare
c
int onepath_liveliness_declare(onepath_session_t s, onepath_token_t *out,
const char *key);声明存活令牌,向网络广播本节点在指定 key 上存活。
- 参数:
s— 会话句柄out— 输出令牌句柄key— 存活 key expression
- 返回值:
ONEPATH_OK成功
onepath_liveliness_undeclare
c
void onepath_liveliness_undeclare(onepath_token_t token);撤销存活令牌。
- 参数:
token— 令牌句柄 - 注意:撤销后,其他节点的存活订阅者会收到
ONEPATH_SAMPLE_DELETE通知
onepath_liveliness_subscribe
c
int onepath_liveliness_subscribe(onepath_session_t s, onepath_subscriber_t *out,
const char *key, onepath_sample_cb cb,
void *userdata, int history);订阅存活变更事件。
- 参数:
s— 会话句柄out— 输出订阅者句柄key— 存活 key expressioncb— 回调函数(见 回调函数)userdata— 用户数据指针history—1表示获取当前所有活跃令牌作为初始事件,0仅接收后续变更
- 返回值:
ONEPATH_OK成功 - 注意:回调中
sample->kind为ONEPATH_SAMPLE_PUT表示上线,ONEPATH_SAMPLE_DELETE表示下线;订阅者用onepath_subscriber_destroy()销毁
onepath_liveliness_get
c
int onepath_liveliness_get(onepath_session_t s, const char *key,
onepath_reply_rx_t *out, uint64_t timeout_ms);一次性查询当前所有活跃的令牌。
- 参数:
s— 会话句柄key— 存活 key expressionout— 输出回复接收通道句柄timeout_ms— 超时时间(毫秒)
- 返回值:
ONEPATH_OK成功 - 注意:回复的接收与释放方式同 查询 Get(
onepath_reply_recv/onepath_reply_release/onepath_reply_rx_destroy)
示例
c
/* 节点 A:声明存活 */
onepath_token_t token;
onepath_liveliness_declare(s, &token, "app/nodes/A");
/* ... 工作 ... */
onepath_liveliness_undeclare(token); /* 下线时撤销 */
/* 节点 B:订阅上下线事件 */
static void on_live(onepath_sample_t *sample, void *userdata) {
(void)userdata;
const char *evt = (sample->kind == ONEPATH_SAMPLE_PUT) ? "上线" : "下线";
printf("[%s] %s\n", sample->key, evt);
onepath_sample_release(sample);
}
onepath_subscriber_t sub;
onepath_liveliness_subscribe(s, &sub, "app/nodes/**", on_live, NULL, 1);
/* ... 运行 ... */
onepath_subscriber_destroy(sub);