enh(tmq): add demo for tmq_get_raw_meta
This commit is contained in:
parent
14048a5e4c
commit
323ee6b1b0
|
@ -27,7 +27,11 @@ static void msg_process(TAOS_RES* msg) {
|
||||||
printf("db: %s\n", tmq_get_db_name(msg));
|
printf("db: %s\n", tmq_get_db_name(msg));
|
||||||
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
||||||
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
|
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
|
||||||
printf("meta, skip\n");
|
void* meta;
|
||||||
|
int32_t metaLen;
|
||||||
|
tmq_get_raw_meta(msg, &meta, &metaLen);
|
||||||
|
|
||||||
|
printf("meta, len is %d\n", metaLen);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
|
@ -261,7 +261,7 @@ enum tmq_res_t {
|
||||||
typedef enum tmq_res_t tmq_res_t;
|
typedef enum tmq_res_t tmq_res_t;
|
||||||
|
|
||||||
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
||||||
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, const void **raw_meta, int32_t *raw_meta_len);
|
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len);
|
||||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||||
|
|
|
@ -1871,7 +1871,7 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_get_raw_meta(TAOS_RES* res, const void** raw_meta, int32_t* raw_meta_len) {
|
int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len) {
|
||||||
if (TD_RES_TMQ_META(res)) {
|
if (TD_RES_TMQ_META(res)) {
|
||||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||||
*raw_meta = pMetaRspObj->metaRsp.metaRsp;
|
*raw_meta = pMetaRspObj->metaRsp.metaRsp;
|
||||||
|
|
|
@ -306,7 +306,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE ||
|
pHead->msgType == TDMT_VND_DROP_STB || pHead->msgType == TDMT_VND_CREATE_TABLE ||
|
||||||
pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE ||
|
pHead->msgType == TDMT_VND_ALTER_TABLE || pHead->msgType == TDMT_VND_DROP_TABLE ||
|
||||||
pHead->msgType == TDMT_VND_DROP_TTL_TABLE);
|
pHead->msgType == TDMT_VND_DROP_TTL_TABLE);
|
||||||
// return
|
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
metaRsp.reqOffset = pReq->currentOffset;
|
metaRsp.reqOffset = pReq->currentOffset;
|
||||||
metaRsp.rspOffset = fetchOffset;
|
metaRsp.rspOffset = fetchOffset;
|
||||||
|
|
Loading…
Reference in New Issue