diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4adc738d35..5ed32b4556 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -28,7 +28,6 @@ #define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0) #define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0) -#define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_HEARTBEAT_INTERVAL 3000 #define DEFAULT_ASKEP_INTERVAL 1000 @@ -174,7 +173,6 @@ typedef struct { int32_t vgId; int32_t vgStatus; int32_t vgSkipCnt; // here used to mark the slow vgroups - int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data int64_t blockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data int64_t blockSleepForReplay; // once empty block is received, idle for ignoreCnt then start to poll data bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. @@ -948,6 +946,7 @@ static void generateTimedTask(int64_t refId, int32_t type) { if (code == TSDB_CODE_SUCCESS) { *pTaskType = type; if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) { + tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 2", tmq->consumerId); if (tsem2_post(&tmq->rspSem) != 0){ tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type); } @@ -1226,7 +1225,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .epSet = pVgEp->epSet, .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, - .emptyBlockReceiveTs = 0, .blockReceiveTs = 0, .blockSleepForReplay = 0, .numOfRows = pInfo ? pInfo->numOfRows : 0, @@ -2128,7 +2126,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } } - + tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 1", tmq->consumerId); if (tsem2_post(&tmq->rspSem) != 0){ tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId); } @@ -2344,14 +2342,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) { if (pVg == NULL) { continue; } - int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs; - if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms - tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, - tmq->epoch, pVg->vgId); - continue; - } - - elapsed = taosGetTimestampMs() - pVg->blockReceiveTs; + int64_t elapsed = taosGetTimestampMs() - pVg->blockReceiveTs; if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) { tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay); @@ -2446,7 +2437,6 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ SMqClientVg* pVg = NULL; getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); if (pVg) { - pVg->emptyBlockReceiveTs = taosGetTimestampMs(); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } taosWUnLockLatch(&tmq->lock); @@ -2525,7 +2515,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); - pVg->emptyBlockReceiveTs = taosGetTimestampMs(); } else { pRspObj = buildRsp(pollRspWrapper); if (pRspObj == NULL) { @@ -2539,7 +2528,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj); tmq->totalRows += numOfRows; } - pVg->emptyBlockReceiveTs = 0; if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime; @@ -2643,8 +2631,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; - TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, 0); (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); + TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0); } else { (void)tsem2_timewait(&tmq->rspSem, 1000); }