From 719d1d1b902f90e9a6d33075f26f6c2519381181 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 18 Jan 2024 14:34:18 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 6 +++--- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/stream/src/streamExec.c | 10 +++------- source/libs/wal/src/walRead.c | 2 +- 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6991e669d5..40b915ce9e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -886,7 +886,8 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask pTask->execInfo.step2Start = taosGetTimestampMs(); if (done) { - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); + qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer, + pRange->maxVer, 0.0); streamTaskPutTranstateIntoInputQ(pTask); streamExecTask(pTask); // exec directly } else { @@ -1141,8 +1142,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { - tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, - req.taskId); + tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed", vgId, req.taskId); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 0b05573aae..383a636f71 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -465,7 +465,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, - pReader->nextBlk, numOfBlocks, idstr); + (pReader->nextBlk + 1), numOfBlocks, idstr); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3ed5128858..d8d26b25d4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2155,7 +2155,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer; pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer; pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow; - qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s", + qDebug("stream scan step2 (scan wal), verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1eb66a82ab..1ec8843c0c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -340,7 +340,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } else { double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.; stDebug( - "s-task:%s fill-history task end, scal wal elapsed time:%.2fSec,update related stream task:%s info, transfer " + "s-task:%s fill-history task end, scan wal elapsed time:%.2fSec,update related stream task:%s info, transfer " "exec state", id, el, pStreamTask->id.idStr); } @@ -380,22 +380,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } + // 1. expand the query time window for stream task of WAL scanner if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 ", status:%s, sched-status:%d", pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, p, pStreamTask->status.schedStatus); - } else { - stDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); - } - // 1. expand the query time window for stream task of WAL scanner - if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { pTimeWindow->skey = INT64_MIN; qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); } else { - stDebug("s-task:%s non-source task no need to reset filter window", pStreamTask->id.idStr); + stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr); } // 2. transfer the ownership of executor state diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 6748d161ae..3854e90901 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -305,7 +305,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { } int32_t walSkipFetchBody(SWalReader *pRead) { - wDebug("vgId:%d, skip fetch body:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 + wDebug("vgId:%d, skip:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 ", applied:%" PRId64 ", 0x%" PRIx64, pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);