diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5cecb1af42..8bced20ca3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -445,6 +445,7 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SDataRange dataRange; + SVersionRange step2Range; SHistoryTaskInfo hTaskInfo; STaskId streamTaskId; STaskExecStatisInfo execInfo; @@ -901,4 +902,4 @@ void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); } #endif -#endif /* ifndef _STREAM_H_ */ \ No newline at end of file +#endif /* ifndef _STREAM_H_ */ diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 05b06e83a8..5d18d0d22e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1541,7 +1541,12 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // history_task_id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); if (pe->hTaskId != 0) { - colDataSetVal(pColInfo, numOfRows, (const char*)&pe->hTaskId, false); + memset(idstr, 0, tListLen(idstr)); + len = tintToHex(pe->hTaskId, &idstr[4]); + idstr[2] = '0'; + idstr[3] = 'x'; + varDataSetLen(idstr, len + 2); + colDataSetVal(pColInfo, numOfRows, idstr, false); } else { colDataSetVal(pColInfo, numOfRows, 0, true); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 92dc55c0c3..791a2c2d92 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -800,33 +800,33 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { const char* id = pTask->id.idStr; int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; - SVersionRange* pRange = &pTask->dataRange.range; + SVersionRange* pStep2Range = &pTask->step2Range; // if it's an source task, extract the last version in wal. bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); pTask->execInfo.step2Start = taosGetTimestampMs(); if (done) { - qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer, - pRange->maxVer, 0.0); + qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, + pStep2Range->maxVer, 0.0); streamTaskPutTranstateIntoInputQ(pTask); streamExecTask(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 ", do secondary scan-history from WAL after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, + id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey, pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); + streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); - int64_t dstVer = pTask->dataRange.range.minVer; + int64_t dstVer =pStep2Range->minVer; pTask->chkInfo.nextProcessVer = dstVer; walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, - pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); + pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 73508202d9..19e53c7d15 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -242,21 +242,23 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // todo handle memory error bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { const char* id = pTask->id.idStr; - int64_t maxVer = pTask->dataRange.range.maxVer; + int64_t maxVer = pTask->step2Range.maxVer; - if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { + if ((pTask->info.fillHistory == 1) && ver > maxVer) { if (!pTask->status.appendTranstateBlock) { qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal anymore, add transfer-state block into inputQ", id, ver, maxVer); double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0; - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs", + id, pTask->step2Range.minVer, maxVer, el); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); return true; } else { - qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", - id, ver, maxVer); + qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64 + ", not scan wal", + id, ver, pTask->step2Range.minVer, maxVer); } } @@ -389,7 +391,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); - int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; + int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 26a80cc6b5..12c5504007 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -926,8 +926,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; - qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 - " - %" PRId64, + qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 "-%" PRId64 ", window:%" PRId64 + "-%" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); return 0; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 32bd3742ad..3abca307da 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -868,8 +868,10 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe } else { // 2. do secondary scan of the history data, the time window remain, and the version range is updated to // [pTask->dataRange.range.maxVer, ver1] - pRange->minVer = walScanStartVer; - pRange->maxVer = nextProcessVer - 1; + pTask->step2Range.minVer = walScanStartVer; + pTask->step2Range.maxVer = nextProcessVer - 1; + stDebug("s-task:%s set step2 verRange:%" PRId64 "-%" PRId64 ", step1 verRange:%" PRId64 "-%" PRId64, pTask->id.idStr, + pTask->step2Range.minVer, pTask->step2Range.maxVer, pRange->minVer, pRange->maxVer); return false; } }