diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3103b6fd76..bfc0eb57ec 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -300,7 +300,6 @@ enum { TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRANSFER_STATE, "vnode-stream-transfer-state", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 63af587794..4c773ea30f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,7 +45,7 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__WAIT_DOWNSTREAM, - TASK_STATUS__SCAN_HISTORY_PREPARE, + TASK_STATUS__SCAN_HISTORY, TASK_STATUS__HALT, // stream task halt to wait for the secondary scan history, this status is invisible for user TASK_STATUS__PAUSE, }; @@ -271,6 +271,7 @@ typedef struct SStreamStatus { int8_t schedStatus; int8_t keepTaskStatus; bool transferState; + TdThreadMutex lock; } SStreamStatus; typedef struct SHistDataRange { @@ -585,7 +586,7 @@ int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* int32_t streamSourceRecoverScanStep1(SStreamTask* pTask); int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq); int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); -int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask); +int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 93b078ad7d..1d29f245e2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -820,7 +820,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->dataRange.range.minVer = ver; // expand executor - pTask->status.taskStatus = /*(pTask->info.fillHistory) ? */TASK_STATUS__WAIT_DOWNSTREAM /*: TASK_STATUS__NORMAL*/; + pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -923,8 +923,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%d, rsp status %d", - pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, pTask->status.taskStatus, rsp.status); + tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", + pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); } else { rsp.status = 0; tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", @@ -1088,7 +1088,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // do recovery step 1 - tqDebug("s-task:%s start history data scan stage(step 1)", pTask->id.idStr); + tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); + int64_t st = taosGetTimestampMs(); // todo set the correct status flag @@ -1116,7 +1117,19 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // todo handle error } + ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); + + // wait for the stream task get ready for scan history data + while (pStreamTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM || + pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr, + pStreamTask->info.taskLevel); + taosMsleep(100); + } + pStreamTask->status.taskStatus = TASK_STATUS__HALT; + tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, + pStreamTask->info.taskLevel, pTask->id.idStr); // if it's an source task, extract the last version in wal. int64_t ver = pTask->dataRange.range.maxVer; @@ -1158,14 +1171,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } return 0; + } // notify the downstream tasks to transfer executor state after handle all history blocks. int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - SStreamTransferReq* pReq = (SStreamTransferReq*)msg; + SStreamTransferReq req; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req); + + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); if (pTask == NULL) { + tqError("failed to find task:0x%x", req.taskId); return -1; } @@ -1212,7 +1231,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); // dispatch recover finish req to all related downstream task - code = streamDispatchRecoverFinishMsg(pTask); + code = streamDispatchScanHistoryFinishMsg(pTask); if (code < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index bf560a3fb9..1a9a4ec612 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -135,9 +135,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__SCAN_HISTORY_PREPARE || - status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status); + if (status != TASK_STATUS__NORMAL) { + tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 99957431f8..96113eeb7f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -573,16 +573,22 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECK: return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECK_RSP: - return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen); + case TDMT_STREAM_TASK_CHECK_RSP: { + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pReq, len); + } case TDMT_STREAM_RETRIEVE: return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_TRANSFER_STATE: - return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen); + case TDMT_STREAM_TRANSFER_STATE: { + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len); + } case TDMT_STREAM_RECOVER_FINISH: return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_RECOVER_FINISH_RSP: diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c866e2cc21..ae3f094d12 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -312,8 +312,7 @@ int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecove msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); - qDebug("s-task:%s dispatch recover finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId); - + qDebug("s-task:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId); return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 688491fbdb..2cbe72e0be 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -326,15 +326,16 @@ int32_t streamExecForAll(SStreamTask* pTask) { SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. - qDebug("s-task:%s start to extract data block from inputQ", id); + qDebug("s-task:%s start to extract data block from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); while (1) { // downstream task's input queue is blocked, stop immediately - if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED) || + if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) || streamTaskShouldStop(&pTask->status)) { if (batchSize > 1) { break; } else { + qDebug("123 %s", pTask->id.idStr); return 0; } } @@ -382,23 +383,40 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pInput) { streamFreeQitem(pInput); } + + qDebug("456 %s", pTask->id.idStr); return 0; } if (pInput == NULL) { - if (pTask->info.fillHistory && pTask->status.transferState) { + qDebug("789 %s", pTask->id.idStr); + + if (pTask->info.fillHistory && pTask->status.transferState) { // todo transfer task state here SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr); ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); - ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); - - // update the scan data range for source task. STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; - qDebug("s-task:%s stream task window %"PRId64" - %"PRId64" transfer to %"PRId64" - %"PRId64", status:%d, sched-status:%d", pStreamTask->id.idStr, - pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, TASK_STATUS__NORMAL, pStreamTask->status.schedStatus); + + if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { + ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); + + // update the scan data range for source task. + qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64 + ", status:%s, sched-status:%d", + pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, + pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); + } else { + // for agg task and sink task, they are continue to execute, no need to be halt. + // the process should be stopped for a while, during the term of transfer task state. + // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer + + + qDebug("s-task:%s no need to update time window", pStreamTask->id.idStr); + } + pTimeWindow->skey = INT64_MIN; streamSetStatusNormal(pStreamTask); @@ -406,6 +424,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { streamMetaReleaseTask(pTask->pMeta, pStreamTask); } + break; } @@ -482,7 +501,7 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%d, sched-status:%d", pTask->id.idStr, pTask->status.taskStatus, + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index c5f1004784..5e278cd7f6 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -21,22 +21,19 @@ const char* streamGetTaskStatusStr(int32_t status) { switch(status) { case TASK_STATUS__NORMAL: return "normal"; case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream"; - case TASK_STATUS__SCAN_HISTORY_PREPARE: return "scan-history-prepare"; + case TASK_STATUS__SCAN_HISTORY: return "scan-history"; case TASK_STATUS__HALT: return "halt"; + case TASK_STATUS__PAUSE: return "paused"; default:return ""; } } int32_t streamTaskLaunchRecover(SStreamTask* pTask) { - qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->info.nodeId); - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__SCAN_HISTORY_PREPARE); - SVersionRange* pRange = &pTask->dataRange.range; - qDebug("s-task:%s set task status:%s and start to recover, ver:%" PRId64 "-%" PRId64, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), pTask->dataRange.range.minVer, - pTask->dataRange.range.maxVer); + qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer); streamSetParamForRecover(pTask); streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window); @@ -63,7 +60,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) { streamAggRecoverPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { streamSetStatusNormal(pTask); - qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr); + qDebug("s-task:%s sink task convert to normal status immediately", pTask->id.idStr); } return 0; @@ -181,7 +178,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; - qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", id, numOfReqs); + qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage", id, numOfReqs); streamTaskLaunchRecover(pTask); } else { qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id, @@ -192,7 +189,12 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return -1; } - qDebug("s-task:%s fixed downstream tasks is ready, now enter into recover stage", id); + ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT); + pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; + + qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id, + streamGetTaskStatusStr(pTask->status.taskStatus)); + streamTaskLaunchRecover(pTask); } else { ASSERT(0); @@ -219,6 +221,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) { } int32_t streamSetStatusNormal(SStreamTask* pTask) { + qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); return 0; } @@ -264,25 +267,29 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { return code; } -int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) { +int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr, - pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus); + qDebug("s-task:%s send scan-history-data complete msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr, + pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus)); req.taskId = pTask->fixedEpDispatcher.taskId; streamDoDispatchRecoverFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t vgSz = taosArrayGetSize(vgInfo); - for (int32_t i = 0; i < vgSz; i++) { + int32_t numOfVgs = taosArrayGetSize(vgInfo); + + qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr, + numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus)); + for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.taskId = pVgInfo->taskId; streamDoDispatchRecoverFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } + return 0; } @@ -333,7 +340,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr, + qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr, pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus); req.taskId = pTask->fixedEpDispatcher.taskId; @@ -375,11 +382,15 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { if (pTask->info.taskLevel == TASK_LEVEL__AGG) { int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); - qDebug("s-task:%s remain unfinished child tasks:%d", pTask->id.idStr, left); ASSERT(left >= 0); + if (left == 0) { + qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, left); streamAggChildrenRecoverFinish(pTask); + } else { + qDebug("s-task:%s remain unfinished upstream tasks:%d", pTask->id.idStr, left); } + } return 0; } @@ -388,10 +399,14 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.minVer = 0; pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; - qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64 - " ver range:%" PRId64 " - %" PRId64, - pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, - pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64 + " ver range:%" PRId64 " - %" PRId64, + pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, + pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); + } else { + qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr); + } // check if downstream tasks have been ready streamTaskCheckDownstreamTasks(pHTask); @@ -444,13 +459,6 @@ int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver) { int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; - - qDebug("s-task:%s set start wal scan start ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); - ASSERT(walReaderGetCurrentVer(pTask->exec.pWalReader) == -1); - -// walReaderSeekVer(pTask->exec.pWalReader, sversion); -// pTask->chkInfo.currentVer = sversion; - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { return 0; } @@ -462,17 +470,13 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { } // dispatch recover finish req to all related downstream task - code = streamDispatchRecoverFinishMsg(pTask); + code = streamDispatchScanHistoryFinishMsg(pTask); if (code < 0) { return -1; } - // set status normal - qDebug("s-task:%s set the status to be normal, and start wal scan", pTask->id.idStr); - code = streamSetStatusNormal(pTask); - if (code < 0) { - return -1; - } + ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY); + /*code = */streamSetStatusNormal(pTask); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamMetaSaveTask(pMeta, pTask);