From 66899bd8d0d33b0ab8ff74f7ec9e571268531a57 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 1 Jun 2022 10:27:41 +0800 Subject: [PATCH] fix(tmq): auto unsubscribe when consumer close --- example/src/tmq.c | 41 ++---------- include/client/taos.h | 2 +- include/common/tmsg.h | 2 +- source/client/src/tmq.c | 34 +++++----- source/dnode/mnode/impl/src/mndMain.c | 6 +- source/dnode/vnode/src/inc/tq.h | 12 ++-- source/dnode/vnode/src/tq/tq.c | 89 ++++++++------------------- 7 files changed, 55 insertions(+), 131 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 7e4de21f2e..5eecb5e4cc 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -196,8 +196,9 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0); if (tmqmessage) { cnt++; + msg_process(tmqmessage); + if (cnt >= 2) break; /*printf("get data\n");*/ - /*msg_process(tmqmessage);*/ taos_free_result(tmqmessage); /*} else {*/ /*break;*/ @@ -253,39 +254,6 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { fprintf(stderr, "%% Consumer closed\n"); } -void perf_loop(tmq_t* tmq, tmq_list_t* topics) { - tmq_resp_err_t err; - - if ((err = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); - printf("subscribe err\n"); - return; - } - int32_t batchCnt = 0; - int32_t skipLogNum = 0; - clock_t startTime = clock(); - while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500); - if (tmqmessage) { - batchCnt++; - /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/ - /*msg_process(tmqmessage);*/ - taos_free_result(tmqmessage); - } else { - break; - } - } - clock_t endTime = clock(); - printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum, - (double)(endTime - startTime) / CLOCKS_PER_SEC); - - err = tmq_consumer_close(tmq); - if (err) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - int main(int argc, char* argv[]) { if (argc > 1) { printf("env init\n"); @@ -296,7 +264,6 @@ int main(int argc, char* argv[]) { } tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); - /*perf_loop(tmq, topic_list);*/ - /*basic_consume_loop(tmq, topic_list);*/ - sync_consume_loop(tmq, topic_list); + basic_consume_loop(tmq, topic_list); + /*sync_consume_loop(tmq, topic_list);*/ } diff --git a/include/client/taos.h b/include/client/taos.h index 0b8c67aa79..beeb3a104e 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -230,7 +230,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); -DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time); +DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 07605c735c..723d305ca5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2412,7 +2412,7 @@ typedef struct { int32_t epoch; uint64_t reqId; int64_t consumerId; - int64_t waitTime; + int64_t timeout; int64_t currentOffset; } SMqPollReq; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index dfa56f80c4..416d1a6f26 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1243,7 +1243,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { return TMQ_RESP_ERR__FAIL; } -SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { +SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { int64_t reqOffset; if (pVg->currentOffset >= 0) { reqOffset = pVg->currentOffset; @@ -1269,7 +1269,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* strcpy(pReq->subKey + tlen + 1, pTopic->topicName); pReq->withTbName = tmq->withTbName; - pReq->waitTime = waitTime; + pReq->timeout = timeout; pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; pReq->currentOffset = reqOffset; @@ -1297,7 +1297,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { return pRspObj; } -int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) { +int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { /*printf("call poll\n");*/ for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); @@ -1318,7 +1318,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) { #endif } atomic_store_32(&pVg->vgSkipCnt, 0); - SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg); + SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); tsem_post(&tmq->rspSem); @@ -1388,7 +1388,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) return 0; } -SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) { +SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { while (1) { SMqRspWrapper* rspWrapper = NULL; taosGetQitem(tmq->qall, (void**)&rspWrapper); @@ -1428,17 +1428,17 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) { taosFreeQitem(rspWrapper); if (pollIfReset && reset) { tscDebug("consumer %ld reset and repoll", tmq->consumerId); - tmqPollImpl(tmq, waitTime); + tmqPollImpl(tmq, timeout); } } } } -TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { +TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { SMqRspObj* rspObj; int64_t startTime = taosGetTimestampMs(); - rspObj = tmqHandleAllRsp(tmq, wait_time, false); + rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { return (TAOS_RES*)rspObj; } @@ -1450,16 +1450,16 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { while (1) { tmqHandleAllDelayedTask(tmq); - if (tmqPollImpl(tmq, wait_time) < 0) return NULL; + if (tmqPollImpl(tmq, timeout) < 0) return NULL; - rspObj = tmqHandleAllRsp(tmq, wait_time, false); + rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { return (TAOS_RES*)rspObj; } - if (wait_time != 0) { + if (timeout != 0) { int64_t endTime = taosGetTimestampMs(); int64_t leftTime = endTime - startTime; - if (leftTime > wait_time) { + if (leftTime > timeout) { tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch); return NULL; } @@ -1474,10 +1474,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { if (tmq->status == TMQ_CONSUMER_STATUS__READY) { tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp == TMQ_RESP_ERR__SUCCESS) { - // TODO: free resources - return TMQ_RESP_ERR__SUCCESS; - } else { + if (rsp == TMQ_RESP_ERR__FAIL) { return TMQ_RESP_ERR__FAIL; } @@ -1485,10 +1482,7 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { rsp = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); - if (rsp == TMQ_RESP_ERR__SUCCESS) { - // TODO: free resources - return TMQ_RESP_ERR__SUCCESS; - } else { + if (rsp == TMQ_RESP_ERR__FAIL) { return TMQ_RESP_ERR__FAIL; } } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 2a2a45a45d..3a3fd7ebdb 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -369,7 +369,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); return TAOS_SYNC_PROPOSE_OTHER_ERROR; } - + char logBuf[512] = {0}; char *syncNodeStr = sync2SimpleStr(pMgmt->sync); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); @@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { } else if (code == 0) { mTrace("msg:%p, successfully processed and response", pMsg); } else { - mDebug("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, + mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); } @@ -686,4 +686,4 @@ void mndReleaseSyncRef(SMnode *pMnode) { int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1); mTrace("mnode sync is released, ref:%d", ref); taosThreadRwlockUnlock(&pMnode->lock); -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 72138926aa..34a7ff823a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -66,12 +66,12 @@ struct STqReadHandle { // tqPush typedef struct { - int64_t consumerId; - int32_t epoch; - int32_t skipLogNum; - int64_t reqOffset; - SRWLatch lock; - SRpcMsg* handle; + int64_t consumerId; + int32_t epoch; + int32_t skipLogNum; + int64_t reqOffset; + SRpcHandleInfo info; + SRWLatch lock; } STqPushHandle; #if 0 diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b4747f2264..4bce829b10 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -180,40 +180,6 @@ void tqClose(STQ* pTq) { // TODO } -#if 0 -int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1; - if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1; - if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1; - if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1; - /*if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;*/ - /*if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;*/ - /*if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;*/ - if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { - if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1; - } - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1; - if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1; - if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1; - if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1; - /*if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;*/ - /*if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;*/ - /*if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;*/ - if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { - if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1; - } - tEndDecode(pDecoder); - return 0; -} -#endif - int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeCStr(pEncoder, pHandle->subKey) < 0) return -1; @@ -290,8 +256,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ taosWLockLatch(&pHandle->pushHandle.lock); - SRpcMsg* pMsg = atomic_load_ptr(&pHandle->pushHandle.handle); - ASSERT(pMsg); + /*SRpcHandleInfo* pInfo = atomic_load_ptr(&pHandle->pushHandle.pInfo);*/ + /*ASSERT(pInfo);*/ SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pHandle->pushHandle.reqOffset; @@ -318,7 +284,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { - pMsg->code = -1; + // todo free return -1; } @@ -329,10 +295,16 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqDataBlkRsp(&abuf, &rsp); - SRpcMsg resp = {.info = handleInfo, .pCont = buf, .contLen = tlen, .code = 0}; + SRpcMsg resp = { + .info = pHandle->pushHandle.info, + .pCont = buf, + .contLen = tlen, + .code = 0, + }; tmsgSendRsp(&resp); - atomic_store_ptr(&pHandle->pushHandle.handle, NULL); + /*atomic_store_ptr(&pHandle->pushHandle.pInfo, NULL);*/ + memset(&pHandle->pushHandle.info, 0, sizeof(SRpcHandleInfo)); taosWUnLockLatch(&pHandle->pushHandle.lock); tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", @@ -374,7 +346,7 @@ int tqCommit(STQ* pTq) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; - int64_t waitTime = pReq->waitTime; + int64_t waitTime = pReq->timeout; int32_t reqEpoch = pReq->epoch; int64_t fetchOffset; @@ -410,24 +382,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); - rsp.blockSchema = taosArrayInit(0, sizeof(void*)); - rsp.blockTbName = taosArrayInit(0, sizeof(void*)); rsp.withTbName = pReq->withTbName; + if (rsp.withTbName) { + rsp.blockTbName = taosArrayInit(0, sizeof(void*)); + } if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { rsp.withSchema = false; + rsp.withTag = false; } else { rsp.withSchema = true; + rsp.blockSchema = taosArrayInit(0, sizeof(void*)); + rsp.withTag = false; } - /*int8_t withTbName = pExec->withTbName;*/ - /*if (pReq->withTbName != -1) {*/ - /*withTbName = pReq->withTbName;*/ - /*}*/ - /*rsp.withTbName = withTbName;*/ - while (1) { consumerEpoch = atomic_load_32(&pHandle->epoch); if (consumerEpoch > reqEpoch) { @@ -443,15 +413,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SWalReadHead* pHead = &pHeadWithCkSum->head; -#if 0 - SWalReadHead* pHead; - if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) { - // TODO: no more log, set timer to wait blocking time - // if data inserted during waiting, launch query and - // response to user - tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), fetchOffset); - #if 0 // add to pushMgr taosWLockLatch(&pExec->pushHandle.lock); @@ -473,10 +434,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; #endif - break; - } -#endif - tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); @@ -533,8 +490,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO wrap in destroy func taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockDataLen); - taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); - taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree); + + if (rsp.withSchema) { + taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + } + + if (rsp.withTbName) { + taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree); + } return 0; }