From 53377c2c1f739422afc6c1005f5b3b10a93625a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Jun 2023 01:14:41 +0800 Subject: [PATCH] fix(stream): wait for stream task completed. --- include/libs/stream/tstream.h | 6 ++-- source/dnode/vnode/src/tq/tq.c | 9 +++--- source/libs/stream/src/streamExec.c | 40 ++++++++++++++++++++++++-- source/libs/stream/src/streamRecover.c | 19 ++++++------ 4 files changed, 55 insertions(+), 19 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 93b1314cad..3cfde016f0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -44,9 +44,8 @@ enum { TASK_STATUS__DROPPING, TASK_STATUS__FAIL, TASK_STATUS__STOP, - TASK_STATUS__WAIT_DOWNSTREAM, - TASK_STATUS__SCAN_HISTORY, - TASK_STATUS__HALT, // stream task halt to wait for the secondary scan history, this status is invisible for user + TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner + TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused TASK_STATUS__PAUSE, }; @@ -565,6 +564,7 @@ int32_t streamSchedExec(SStreamTask* pTask); int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); +bool streamTaskIsIdle(const SStreamTask* pTask); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 648b8f4ec2..e1cf46a5f4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1121,8 +1121,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(100); } - taosMsleep(10000); - // now we can stop the stream task execution pStreamTask->status.taskStatus = TASK_STATUS__HALT; tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr, @@ -1378,12 +1376,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { - if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { - tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr, + int8_t status = pTask->status.taskStatus; + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) { + tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); } else { - if (streamTaskShouldPause(&pTask->status) || (pTask->status.taskStatus == TASK_STATUS__HALT)) { + if (streamTaskShouldPause(&pTask->status)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b717ea456a..b68d15328d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -189,6 +189,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (qExecTask(exec, &output, &ts) < 0) { continue; } + if (output == NULL) { if (qStreamRecoverScanFinished(exec)) { finished = true; @@ -396,16 +397,30 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; + // here we need to wait for the stream task handle all data in the input queue. if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); + } else { + ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); + pStreamTask->status.taskStatus = TASK_STATUS__HALT; + } + {// wait for the stream task to be idle + while(!streamTaskIsIdle(pStreamTask)) { + qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr, + pTask->info.taskLevel, pStreamTask->id.idStr); + taosMsleep(100); + } + } + + if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64 ", status:%s, sched-status:%d", pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); } else { - // for agg task and sink task, they are continue to execute, no need to be halt. + // for sink tasks, they are continue to execute, no need to be halt. // the process should be stopped for a while, during the term of transfer task state. // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer @@ -413,12 +428,13 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr); } + // expand the query time window for stream scanner pTimeWindow->skey = INT64_MIN; streamSetStatusNormal(pStreamTask); streamMetaSaveTask(pTask->pMeta, pStreamTask); if (streamMetaCommit(pTask->pMeta)) { - // persistent to disk for + // persistent to disk } streamSchedExec(pStreamTask); @@ -481,12 +497,32 @@ int32_t streamExecForAll(SStreamTask* pTask) { double el = (taosGetTimestampMs() - st) / 1000.0; qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, batchSize, el, resSize / 1048576.0, totalBlocks); + streamFreeQitem(pInput); } return 0; } +bool streamTaskIsIdle(const SStreamTask* pTask) { + int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue); + if (numOfItems > 0) { + return false; + } + + numOfItems = taosQallItemSize(pTask->inputQueue->qall); + if (numOfItems > 0) { + return false; + } + + // blocked by downstream task + if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) { + return false; + } + + return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE); +} + int32_t streamTryExec(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. int8_t schedStatus = diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 46b6798c64..f041b17d2e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -40,7 +40,6 @@ int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) { const char* streamGetTaskStatusStr(int32_t status) { switch(status) { case TASK_STATUS__NORMAL: return "normal"; - case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream"; case TASK_STATUS__SCAN_HISTORY: return "scan-history"; case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__PAUSE: return "paused"; @@ -217,18 +216,17 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return -1; } + // set the downstream tasks have been checked flag ASSERT(pTask->status.checkDownstream == 0); pTask->status.checkDownstream = 1; - ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT); - + ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL); if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); streamTaskLaunchScanHistory(pTask); } else { - ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); - qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id, + qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); } } else { @@ -396,15 +394,17 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask) { return 0; } -int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { +int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; if (qRestoreStreamOperatorOption(exec) < 0) { return -1; } + if (qStreamRecoverFinish(exec) < 0) { return -1; } - streamSetStatusNormal(pTask); + +// streamSetStatusNormal(pTask); return 0; } @@ -414,8 +414,9 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { ASSERT(left >= 0); if (left == 0) { - qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, left); - streamAggChildrenRecoverFinish(pTask); + int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); + qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, numOfTasks); + streamAggUpstreamScanHistoryFinish(pTask); } else { qDebug("s-task:%s remain unfinished upstream tasks:%d", pTask->id.idStr, left); }