diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 907a6ebe3c..3db3cdb888 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1805,14 +1805,15 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms + int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs; + if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed > 0) { // less than 10ms tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } - if (tmq->replayEnable && - taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms + elapsed = taosGetTimestampMs() - pVg->blockReceiveTs; + if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed > 0) { tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay); continue; @@ -2127,7 +2128,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; - if (elapsedTime > timeout) { + if (elapsedTime > timeout || elapsedTime < 0) { tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 516a47606b..0e9f596cb1 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -385,7 +385,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { SWalReader* pWalReader = pReader->pWalReader; - uint64_t st = taosGetTimestampMs(); + int64_t st = taosGetTimestampMs(); while (1) { int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { @@ -413,7 +413,8 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->msg.msgStr = NULL; - if (taosGetTimestampMs() - st > 1000) { + int64_t elapsed = taosGetTimestampMs() - st; + if(elapsed > 1000 && elapsed < 0){ return false; }