diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8367c47464..fe669b7fac 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -697,7 +697,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); // recover and fill history void streamTaskCheckDownstream(SStreamTask* pTask); -int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); +int32_t streamTaskStartScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 43c1b84fa7..6aacb5d2bb 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -60,10 +60,17 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { STaskStartInfo* pStartInfo = &pMeta->startInfo; pStartInfo->readyTs = pTask->execInfo.start; - pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs; + + if (pStartInfo->startTs != 0) { + pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs; + } else { + pStartInfo->elapsedTime = 0; + } + streamMetaResetStartInfo(pStartInfo); stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64 @@ -71,6 +78,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); } + taosWUnLockLatch(&pMeta->lock); } @@ -94,7 +102,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { return 0; } -static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { +static int32_t doStartScanHistoryTask(SStreamTask* pTask) { SVersionRange* pRange = &pTask->dataRange.range; if (pTask->info.fillHistory) { streamSetParamForScanHistory(pTask); @@ -105,15 +113,18 @@ static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) { return code; } -int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { +int32_t streamTaskStartScanHistory(SStreamTask* pTask) { + ASSERT(pTask->status.downstreamReady == 1); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - return doLaunchScanHistoryTask(pTask); + return doStartScanHistoryTask(pTask); } else { ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); - stDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus, - walReaderGetCurrentVer(pTask->exec.pWalReader)); + stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus, + walReaderGetCurrentVer(pTask->exec.pWalReader)); + streamTaskEnablePause(pTask); } } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { if (pTask->info.fillHistory) { @@ -179,7 +190,7 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { streamTaskSetReady(pTask, 0); streamTaskSetRangeStreamCalc(pTask); - streamTaskLaunchScanHistory(pTask); + streamTaskStartScanHistory(pTask); streamLaunchFillHistoryTask(pTask); } @@ -289,8 +300,9 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { if (status == TASK_STATUS__SCAN_HISTORY) { stDebug("s-task:%s enter into scan-history data stage, status:%s", id, str); - streamTaskLaunchScanHistory(pTask); + streamTaskStartScanHistory(pTask); } else { + // fill-history tasks are not allowed to reach here. if (pTask->info.fillHistory == 1) { stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); pTask->status.taskStatus = TASK_STATUS__DROPPING; diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 7da8da09bf..3b3d275a07 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -137,6 +137,7 @@ echo "idxDebugFlag 143" >> $TAOS_CFG echo "udfDebugFlag 143" >> $TAOS_CFG echo "smaDebugFlag 143" >> $TAOS_CFG echo "metaDebugFlag 143" >> $TAOS_CFG +echo "stDebugFlag 143" >> $TAOS_CFG echo "numOfLogLines 20000000" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG echo "locale en_US.UTF-8" >> $TAOS_CFG