From 8f1f423c9053456c6b543c4a9eddbd90970fa61c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Mar 2023 00:37:09 +0800 Subject: [PATCH] fix(tmq): fix the error in tmq. --- source/client/src/clientTmq.c | 17 ++++++++--------- utils/test/c/tmqSim.c | 2 -- 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 95722ae3e5..522f5cc1b9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,8 +24,6 @@ #include "tref.h" #include "ttimer.h" -#define VG_POLL_IGNORE_TICK 100 - struct SMqMgmt { int8_t inited; tmr_h timer; @@ -134,7 +132,7 @@ typedef struct { int32_t vgId; int32_t vgStatus; int32_t vgSkipCnt; - int32_t vgIgnoreCnt; // once empty block is received, idle for ignoreCnt then start to poll data + int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data SEpSet epSet; } SMqClientVg; @@ -1403,7 +1401,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .epSet = pVgEp->epSet, .vgStatus = TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, - .vgIgnoreCnt = 0, + .emptyBlockReceiveTs = 0, }; taosArrayPush(pTopic->vgs, &clientVg); @@ -1704,10 +1702,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (pVg->vgIgnoreCnt > 0) { - pVg->vgIgnoreCnt -= 1; - tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %d tick before poll", tmq->consumerId, tmq->epoch, - pVg->vgId, pVg->vgIgnoreCnt); + if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < 100) { // less than 100ms + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch, + pVg->vgId); continue; } @@ -1846,9 +1843,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { rspWrapper = NULL; tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, pollRspWrapper->reqId); - pVg->vgIgnoreCnt = VG_POLL_IGNORE_TICK; + pVg->emptyBlockReceiveTs = taosGetTimestampMs(); taosFreeQitem(pollRspWrapper); continue; + } else { + pVg->emptyBlockReceiveTs = 0; // reset the ts } // build rsp diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index ce4bafb79b..ce61b80d41 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -735,9 +735,7 @@ void build_consumer(SThreadInfo* pInfo) { } pInfo->tmq = tmq_consumer_new(conf, NULL, 0); - tmq_conf_destroy(conf); - return; }