From abe37a16e3756717e62f2f9c849b6a498e82f50c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 18 Feb 2023 12:40:09 +0800 Subject: [PATCH] refactor: do some internal refactor and add some logs. --- source/client/src/clientTmq.c | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 2646fe30b3..a17ad97756 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -795,19 +795,23 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { STaosQall* qall = taosAllocateQall(); taosReadAllQitems(pTmq->delayedTask, qall); + if (qall->numOfItems == 0) { + taosFreeQall(qall); + return TSDB_CODE_SUCCESS; + } + tscDebug("consumer:0x%"PRIx64" handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems); + int8_t* pTaskType = NULL; + taosGetQitem(qall, (void**)&pTaskType); - while (1) { - int8_t* pTaskType = NULL; - taosGetQitem(qall, (void**)&pTaskType); - if (pTaskType == NULL) break; - + while (pTaskType != NULL) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { tmqAskEp(pTmq, true); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; + tscDebug("consumer:0x%"PRIx64" retrieve ep from mnode in 1s", pTmq->consumerId); taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam); @@ -815,12 +819,16 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; + tscDebug("consumer:0x%"PRIx64" commit to mnode in %.2f s", pTmq->consumerId, pTmq->autoCommitInterval/1000.0); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { + // do nothing } else { ASSERT(0); } + taosFreeQitem(pTaskType); + taosGetQitem(qall, (void**)&pTaskType); } taosFreeQall(qall);