feature(tmq): add new API to extract offset from result set.

This commit is contained in:
Haojun Liao 2023-05-16 09:51:00 +08:00
parent de0cc463e1
commit 573a86ed99
2 changed files with 24 additions and 0 deletions

View File

@ -310,6 +310,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
/* ------------------------------ TAOSX -----------------------------------*/ /* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable // note: following apis are unstable

View File

@ -2109,6 +2109,29 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
} }
} }
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*) res;
STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
if (pOffset->type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.rspOffset.version;
}
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->metaRsp.rspOffset.version;
}
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.rspOffset.version;
}
}
// data from tsdb, no valid offset info
return -1;
}
const char* tmq_get_table_name(TAOS_RES* res) { const char* tmq_get_table_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) { if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res; SMqRspObj* pRspObj = (SMqRspObj*)res;