fix: remove waitting for empty block

This commit is contained in:
wangmm0220 2025-03-03 19:55:04 +08:00
parent f88c249fa4
commit 100c65ced5
1 changed files with 4 additions and 16 deletions

View File

@ -28,7 +28,6 @@
#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0) #define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0) #define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqClientDebugFlag|cDebugFlag, __VA_ARGS__); }} while(0)
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000
#define DEFAULT_HEARTBEAT_INTERVAL 3000 #define DEFAULT_HEARTBEAT_INTERVAL 3000
#define DEFAULT_ASKEP_INTERVAL 1000 #define DEFAULT_ASKEP_INTERVAL 1000
@ -174,7 +173,6 @@ typedef struct {
int32_t vgId; int32_t vgId;
int32_t vgStatus; int32_t vgStatus;
int32_t vgSkipCnt; // here used to mark the slow vgroups int32_t vgSkipCnt; // here used to mark the slow vgroups
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
int64_t blockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data int64_t blockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
int64_t blockSleepForReplay; // once empty block is received, idle for ignoreCnt then start to poll data int64_t blockSleepForReplay; // once empty block is received, idle for ignoreCnt then start to poll data
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
@ -948,6 +946,7 @@ static void generateTimedTask(int64_t refId, int32_t type) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
*pTaskType = type; *pTaskType = type;
if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) { if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 2", tmq->consumerId);
if (tsem2_post(&tmq->rspSem) != 0){ if (tsem2_post(&tmq->rspSem) != 0){
tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type); tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
} }
@ -1226,7 +1225,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.epSet = pVgEp->epSet, .epSet = pVgEp->epSet,
.vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0, .vgSkipCnt = 0,
.emptyBlockReceiveTs = 0,
.blockReceiveTs = 0, .blockReceiveTs = 0,
.blockSleepForReplay = 0, .blockSleepForReplay = 0,
.numOfRows = pInfo ? pInfo->numOfRows : 0, .numOfRows = pInfo ? pInfo->numOfRows : 0,
@ -2128,7 +2126,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
} }
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 1", tmq->consumerId);
if (tsem2_post(&tmq->rspSem) != 0){ if (tsem2_post(&tmq->rspSem) != 0){
tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId); tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
} }
@ -2344,14 +2342,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) {
if (pVg == NULL) { if (pVg == NULL) {
continue; continue;
} }
int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs; int64_t elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms
tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tmq->epoch, pVg->vgId);
continue;
}
elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) { if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay); tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
@ -2446,7 +2437,6 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
SMqClientVg* pVg = NULL; SMqClientVg* pVg = NULL;
getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg); getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId, &pVg);
if (pVg) { if (pVg) {
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
} }
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
@ -2525,7 +2515,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
", total:%" PRId64 ",QID:0x%" PRIx64, ", total:%" PRId64 ",QID:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
} else { } else {
pRspObj = buildRsp(pollRspWrapper); pRspObj = buildRsp(pollRspWrapper);
if (pRspObj == NULL) { if (pRspObj == NULL) {
@ -2539,7 +2528,6 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj); tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
tmq->totalRows += numOfRows; tmq->totalRows += numOfRows;
} }
pVg->emptyBlockReceiveTs = 0;
if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { if (tmq->replayEnable && pRspWrapper->tmqRspType != TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
pVg->blockReceiveTs = taosGetTimestampMs(); pVg->blockReceiveTs = taosGetTimestampMs();
pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime; pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
@ -2643,8 +2631,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if (timeout >= 0) { if (timeout >= 0) {
int64_t currentTime = taosGetTimestampMs(); int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime; int64_t elapsedTime = currentTime - startTime;
TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, 0);
(void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
TSDB_CHECK_CONDITION(elapsedTime < timeout && elapsedTime >= 0, code, lino, END, 0);
} else { } else {
(void)tsem2_timewait(&tmq->rspSem, 1000); (void)tsem2_timewait(&tmq->rspSem, 1000);
} }