From a19e63fd235ed1a0050f895e5371461a9203c1c4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Jun 2023 23:00:14 +0800 Subject: [PATCH] fix(stream): fix error in handling fill history. --- include/libs/executor/executor.h | 12 ++------ include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 21 +++++++------ source/dnode/vnode/src/tq/tqRestore.c | 3 +- source/libs/executor/inc/querytask.h | 3 -- source/libs/executor/src/executor.c | 26 ++++++++++------- source/libs/executor/src/executorInt.c | 38 ------------------------ source/libs/executor/src/scanoperator.c | 12 ++++---- source/libs/stream/src/stream.c | 3 +- source/libs/stream/src/streamExec.c | 9 +++--- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamRecover.c | 39 +++++++------------------ 12 files changed, 58 insertions(+), 112 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 5fd9a4a8c1..f4713f7a6f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -195,14 +195,6 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key); -/** - * return the scan info, in the form of tuple of two items, including table uid and current timestamp - * @param tinfo - * @param uid - * @param ts - * @return - */ -int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts); SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); @@ -225,16 +217,16 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); -int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo); bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo); int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo); -void qStreamCloseTsdbReader(void* task); void resetTaskInfo(qTaskInfo_t tinfo); +void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo); + int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9a72f785ae..ef533dc969 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -267,7 +267,7 @@ typedef struct SCheckpointInfo { typedef struct SStreamStatus { int8_t taskStatus; - int8_t checkDownstream; // downstream tasks are all ready now, if this flag is set + int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set int8_t schedStatus; int8_t keepTaskStatus; bool transferState; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 677ce88457..f68b725cdd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -817,10 +817,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { SStreamTask task = {0}; if (pTask->info.fillHistory) { task.id = pTask->streamTaskId; - SStreamMeta meta = {0}; task.pMeta = pTask->pMeta; pSateTask = &task; } + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); if (pTask->pState == NULL) { return -1; @@ -840,7 +840,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { SStreamTask task = {0}; if (pTask->info.fillHistory) { task.id = pTask->streamTaskId; - SStreamMeta meta = {0}; task.pMeta = pTask->pMeta; pSateTask = &task; } @@ -1100,6 +1099,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; + if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { // 1. stop the related stream task, get the current scan wal version of stream task, ver. pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); @@ -1110,7 +1110,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); // wait for the stream task get ready for scan history data - while (((pStreamTask->status.checkDownstream == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || + while (((pStreamTask->status.downstreamReady == 0) && (pStreamTask->status.taskStatus != TASK_STATUS__STOP)) || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr, pStreamTask->info.taskLevel); @@ -1136,8 +1136,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pRange->maxVer = ver; if (pRange->minVer == pRange->maxVer) { streamTaskRecoverSetAllStepFinished(pTask); - tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", - pId); + tqDebug("s-task:%s no need to perform secondary scan-history-data(step 2), since no new data ingest", pId); } } @@ -1147,31 +1146,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pId, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - st = taosGetTimestampMs(); + st = taosGetTimestampMs(); streamSetParamForStreamScannerStep2(pTask, pRange, &pTask->dataRange.window); } if(!streamTaskRecoverScanStep2Finished(pTask)) { + streamSourceScanHistoryData(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s is dropped or paused, abort recover in step1", pId); streamMetaReleaseTask(pMeta, pTask); return 0; } + streamTaskRecoverSetAllStepFinished(pTask); } el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", pId, el); + // 3. notify the downstream tasks to transfer executor state after handle all history blocks. if (!pTask->status.transferState) { - // 3. notify the downstream tasks to transfer executor state after handle all history blocks. - pTask->status.transferState = true; code = streamDispatchTransferStateMsg(pTask); if (code != TSDB_CODE_SUCCESS) { // todo handle error } + + pTask->status.transferState = true; } // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. @@ -1179,12 +1181,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTryExec(pTask); pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s set status to be dropping", pId); // transfer the ownership of executor state streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); + streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pStreamTask); @@ -1405,6 +1407,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { + // even in halt status, the data in inputQ must be processed int8_t status = pTask->status.taskStatus; if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index ea7327f741..5db3e735cc 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -271,9 +271,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - noDataInWal = false; - if (pItem != NULL) { + noDataInWal = false; code = tAppendDataToInputQueue(pTask, pItem); if (code == TSDB_CODE_SUCCESS) { pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 9f8b749171..cdf37bcc6b 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -68,9 +68,6 @@ typedef struct { SQueryTableDataCond tableCond; SVersionRange fillHistoryVer; STimeWindow fillHistoryWindow; -// int64_t fillHistoryVer1; -// int64_t fillHisotryeKey1; - int64_t fillHistoryVer2; SStreamState* pState; int64_t dataVersion; int64_t checkPointId; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fab608c2b5..5f86f195b6 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -116,6 +116,16 @@ void resetTaskInfo(qTaskInfo_t tinfo) { clearStreamBlock(pTaskInfo->pRoot); } +void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; + if (pTaskInfo == NULL) { + return; + } + + qDebug("%s set fill history start key:%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN); + pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; +} + static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -265,6 +275,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo); if (NULL == pTaskInfo->pRoot) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -314,8 +325,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v return NULL; } - struct SSubplan* pPlan = NULL; - int32_t code = qStringToSubplan(msg, &pPlan); + SSubplan* pPlan = NULL; + int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -907,14 +918,6 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan return 0; } -int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); - pTaskInfo->streamInfo.fillHistoryVer2 = ver; - pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE2; - return 0; -} - int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); @@ -1056,6 +1059,9 @@ int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; pTaskInfo->streamInfo.recoverStep1Finished = true; pTaskInfo->streamInfo.recoverStep2Finished = true; + + // reset the time window + pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; return 0; } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 472e7fed0a..eb55ab5e08 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -1051,44 +1051,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa return TSDB_CODE_SUCCESS; } -void qStreamCloseTsdbReader(void* task) { - if (task == NULL) { - return; - } - - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task; - SOperatorInfo* pOp = pTaskInfo->pRoot; - - qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.currentOffset.uid, - pTaskInfo->streamInfo.currentOffset.ts); - - // todo refactor, other thread may already use this read to extract data. - pTaskInfo->streamInfo.currentOffset = (STqOffsetVal){0}; - while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) { - SOperatorInfo* pDownstreamOp = pOp->pDownstream[0]; - if (pDownstreamOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pInfo = pDownstreamOp->info; - if (pInfo->pTableScanOp) { - STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - - setOperatorCompleted(pInfo->pTableScanOp); - while (pTaskInfo->owner != 0) { - taosMsleep(100); - qDebug("wait for the reader stopping"); - } - - pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pTSInfo->base.dataReader); - pTSInfo->base.dataReader = NULL; - - // restore the status, todo refactor. - pInfo->pTableScanOp->status = OP_OPENED; - pTaskInfo->status = TASK_NOT_COMPLETED; - return; - } - } - } -} - void streamOpReleaseState(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ca83ef6d51..7f510d6745 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1800,8 +1800,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1; } else { - pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer + 1; - pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; + pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer; + pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer.maxVer; qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, id); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; @@ -1873,7 +1873,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); } else { - pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer2); + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pTaskInfo->streamInfo.fillHistoryVer.maxVer); doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); } } @@ -2100,6 +2100,8 @@ FETCH_NEXT_BLOCK: STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey); + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); bool hasUnqualified = false; @@ -2124,8 +2126,8 @@ FETCH_NEXT_BLOCK: pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - qDebug("%" PRId64 " rows in datablock, update res:%" PRId64 " %s", pBlockInfo->rows, - pInfo->pUpdateDataRes->info.rows, id); + qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, + pInfo->pUpdateDataRes->info.rows); if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { break; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b5c83bc10b..d0c65cc256 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -121,9 +121,10 @@ int32_t streamSchedExec(SStreamTask* pTask) { pRunReq->streamId = pTask->id.streamId; pRunReq->taskId = pTask->id.taskId; + qDebug("trigger to run s-task:%s", pTask->id.idStr); + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); - qDebug("trigger to run s-task:%s", pTask->id.idStr); } else { qDebug("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3c150e924f..4db4e0d9a6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -332,7 +332,7 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -static void waitForTaskTobeIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { +static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { // wait for the stream task to be idle int64_t st = taosGetTimestampMs(); @@ -367,12 +367,12 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { } // wait for the stream task to be idle - waitForTaskTobeIdle(pTask, pStreamTask); + waitForTaskIdle(pTask, pStreamTask); if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. - qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64 - ", status:%s, sched-status:%d", + qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 + ", status:%s, sched-status:%d", pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); @@ -388,6 +388,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { // expand the query time window for stream scanner pTimeWindow->skey = INT64_MIN; + qResetStreamInfoTimeWindow(pStreamTask->exec.pExecutor); streamSetStatusNormal(pStreamTask); streamSchedExec(pStreamTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 593f84cb8a..a245b2d4f6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -438,7 +438,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - ASSERT(pTask->status.checkDownstream == 0); + ASSERT(pTask->status.downstreamReady == 0); } tdbFree(pKey); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index a025d01004..6ff7a365a9 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -97,16 +97,16 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 - "-%" PRId64, - pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, - pWindow->skey, pWindow->ekey); - req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; + qDebug("s-task:%s check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 + "-%" PRId64 ", req:0x%" PRIx64, + pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, + pWindow->skey, pWindow->ekey, req.reqId); + streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; @@ -129,7 +129,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - pTask->status.checkDownstream = 1; + pTask->status.downstreamReady = 1; qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s", pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus)); @@ -222,8 +222,8 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } // set the downstream tasks have been checked flag - ASSERT(pTask->status.checkDownstream == 0); - pTask->status.checkDownstream = 1; + ASSERT(pTask->status.downstreamReady == 0); + pTask->status.downstreamReady = 1; ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL); if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { @@ -286,23 +286,6 @@ int32_t streamSourceScanHistoryData(SStreamTask* pTask) { return streamScanExec(pTask, 100); } -int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { - void* exec = pTask->exec.pExecutor; - const char* id = pTask->id.idStr; - - int64_t st = taosGetTimestampMs(); - qDebug("s-task:%s recover step2(blocking stage) started", id); - if (qStreamSourceRecoverStep2(exec, ver) < 0) { - } - - int32_t code = streamScanExec(pTask, 100); - - double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s recover step2(blocking stage) ended, elapsed time:%.2fs", id, el); - - return code; -} - int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; @@ -373,8 +356,8 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr, - pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus); + qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr, + pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus)); req.taskId = pTask->fixedEpDispatcher.taskId; doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); @@ -693,7 +676,7 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } - ASSERT(pTask->status.checkDownstream == 0); + ASSERT(pTask->status.downstreamReady == 0); // check downstream tasks for itself streamTaskCheckDownstreamTasks(pTask);