From d5974a8f2510474d5a94e9493cc820e03ae83e39 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jul 2023 18:05:32 +0800 Subject: [PATCH 1/4] refactor(stream): refactor the pause/resume for fill history execution. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 151 ++++++++++++-------------- source/dnode/vnode/src/tq/tqRestore.c | 7 +- source/libs/stream/src/streamExec.c | 2 +- 4 files changed, 74 insertions(+), 88 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b7544a13ca..cffd8d0673 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,7 +45,7 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner - TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal +// TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause }; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 310cc3599b..a7e912c1f7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1067,7 +1067,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 3. It's an fill history task, do nothing. wait for the main task to start it SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); if (p != NULL) { // reset the downstreamReady flag. - p->status.downstreamReady = 0; streamTaskCheckDownstreamTasks(p); } @@ -1076,12 +1075,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { - int32_t code = TSDB_CODE_SUCCESS; - char* msg = pMsg->pCont; - + SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamMeta* pMeta = pTq->pStreamMeta; - SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)msg; + int32_t code = TSDB_CODE_SUCCESS; SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); if (pTask == NULL) { tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", @@ -1089,12 +1086,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return -1; } - // do recovery step 1 + // do recovery step1 const char* id = pTask->id.idStr; const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - tqDebug("s-task:%s start history data scan stage(step 1), status:%s", id, pStatus); + tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus); - int64_t st = taosGetTimestampMs(); + if (pTask->tsInfo.step1Start == 0) { + pTask->tsInfo.step1Start = taosGetTimestampMs(); + } // we have to continue retrying to successfully execute the scan history task. int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, @@ -1108,30 +1107,21 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } ASSERT(pTask->status.pauseAllowed == false); - if (pTask->info.fillHistory == 1) { streamTaskEnablePause(pTask); } - if (!streamTaskRecoverScanStep1Finished(pTask)) { - streamSourceScanHistoryData(pTask); - } - - // disable the pause when handling the step2 scan of tsdb data. - // the whole next procedure cann't be stopped. - // todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode. - if (pTask->info.fillHistory == 1) { - streamTaskDisablePause(pTask); - } - - if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); + streamSourceScanHistoryData(pTask); + if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { + double el = taosGetTimestampMs() - pTask->tsInfo.step1Start; + tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, + TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - streamMetaReleaseTask(pMeta, pTask); return 0; } - double el = (taosGetTimestampMs() - st) / 1000.0; + // the following procedure should be executed, no matter status is stop/pause or not + double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { @@ -1139,77 +1129,72 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pStreamTask = NULL; bool done = false; - if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { - // 1. stop the related stream task, get the current scan wal version of stream task, ver. - pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); - if (pStreamTask == NULL) { - qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", - pTask->streamTaskId.taskId, pTask->id.idStr); + // if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { + // 1. stop the related stream task, get the current scan wal version of stream task, ver. + pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); + if (pStreamTask == NULL) { + // todo delete this task, if the related stream task is dropped + qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", + pTask->streamTaskId.taskId, pTask->id.idStr); - pTask->status.taskStatus = TASK_STATUS__DROPPING; - tqDebug("s-task:%s fill-history task set status to be dropping", id); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamMetaSaveTask(pMeta, pTask); - streamMetaReleaseTask(pMeta, pTask); - return -1; - } - - ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - - // stream task in TASK_STATUS__SCAN_HISTORY can not be paused. - // wait for the stream task get ready for scan history data - while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { - tqDebug( - "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", - id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); - taosMsleep(100); - } - - // now we can stop the stream task execution - streamTaskHalt(pStreamTask); - tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); - - // if it's an source task, extract the last version in wal. - pRange = &pTask->dataRange.range; - int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); + streamMetaSaveTask(pMeta, pTask); + streamMetaReleaseTask(pMeta, pTask); + return -1; } + ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); + + // stream task in TASK_STATUS__SCAN_HISTORY can not be paused. + // wait for the stream task get ready for scan history data + while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + tqDebug( + "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", + id, pTask->info.taskLevel, pStreamTask->id.idStr, streamGetTaskStatusStr(pStreamTask->status.taskStatus)); + taosMsleep(100); + } + + // now we can stop the stream task execution + streamTaskHalt(pStreamTask); + tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, + pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); + + // if it's an source task, extract the last version in wal. + pRange = &pTask->dataRange.range; + int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); + if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); streamTaskEndScanWAL(pTask); streamMetaReleaseTask(pMeta, pTask); } else { - if (!streamTaskRecoverScanStep1Finished(pTask)) { - STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 - ", do secondary scan-history from WAL after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); - ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); + STimeWindow* pWindow = &pTask->dataRange.window; + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + ", do secondary scan-history from WAL after halt the related stream task:%s", + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, + pStreamTask->id.idStr); + ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - pTask->tsInfo.step2Start = taosGetTimestampMs(); - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); - } + pTask->tsInfo.step2Start = taosGetTimestampMs(); + streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); - if (!streamTaskRecoverScanStep2Finished(pTask)) { - pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; - if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); - streamMetaReleaseTask(pMeta, pTask); - return 0; - } + int64_t dstVer = pTask->dataRange.range.minVer - 1; - int64_t dstVer = pTask->dataRange.range.minVer - 1; - - pTask->chkInfo.currentVer = dstVer; - walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); - tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer, - TASK_SCHED_STATUS__INACTIVE); - } + pTask->chkInfo.currentVer = dstVer; + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); + tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, + pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + // set the fill-history task to be normal + if (pTask->info.fillHistory == 1) { + streamSetStatusNormal(pTask); + } + // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 5. resume the related stream task. streamMetaReleaseTask(pMeta, pTask); @@ -1226,7 +1211,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( - "s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time " + "s-task:%s scan-history in stream time window completed, no related fill-history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); qResetStreamInfoTimeWindow(pTask->exec.pExecutor); @@ -1422,7 +1407,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask != NULL) { // even in halt status, the data in inputQ must be processed int8_t st = pTask->status.taskStatus; - if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) { + if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); @@ -1559,7 +1544,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } - if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) { + if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { streamStartRecoverTask(pTask, igUntreated); } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { tqStartStreamTasks(pTq); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 67ae160d6d..f4d82e456e 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -211,7 +211,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { - qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64 + qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal anymore, set the transfer state flag", pTask->id.idStr, ver, pTask->dataRange.range.maxVer); pTask->status.transferState = true; @@ -256,14 +256,15 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) { + if (status != TASK_STATUS__NORMAL/* && status != TASK_STATUS__SCAN_HISTORY_WAL*/) { tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { - ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); +// ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); + ASSERT(status == TASK_STATUS__NORMAL); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, pTask->dataRange.range.maxVer); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c546b36191..e52b8f54a5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -615,7 +615,7 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here if (taosQueueEmpty(pTask->inputQueue->queue)) { // fill-history WAL scan has completed - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) { streamTaskRecoverSetAllStepFinished(pTask); streamTaskEndScanWAL(pTask); } else { From 936afeb5ac108b3ae53db79ff9fe48e5bfe4e21f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jul 2023 19:13:03 +0800 Subject: [PATCH 2/4] refactor: do internal refactor to simple the pause/resume in case of fill history exists. --- include/libs/stream/tstream.h | 1 - source/dnode/vnode/src/tq/tq.c | 22 +++++++++++++--------- source/libs/stream/src/streamExec.c | 3 +++ source/libs/stream/src/streamRecover.c | 13 ++++++++++++- 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cffd8d0673..066f83fbcb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,7 +45,6 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner -// TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause }; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a7e912c1f7..105ad73362 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1092,7 +1092,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus); if (pTask->tsInfo.step1Start == 0) { + ASSERT(pTask->status.pauseAllowed == false); pTask->tsInfo.step1Start = taosGetTimestampMs(); + if (pTask->info.fillHistory == 1) { + streamTaskEnablePause(pTask); + } + } else { + tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start); } // we have to continue retrying to successfully execute the scan history task. @@ -1106,14 +1112,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } - ASSERT(pTask->status.pauseAllowed == false); if (pTask->info.fillHistory == 1) { - streamTaskEnablePause(pTask); + ASSERT(pTask->status.pauseAllowed == true); } streamSourceScanHistoryData(pTask); if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - double el = taosGetTimestampMs() - pTask->tsInfo.step1Start; + double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); @@ -1129,26 +1134,24 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pStreamTask = NULL; bool done = false; - // if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { - // 1. stop the related stream task, get the current scan wal version of stream task, ver. + // 1. get the related stream task pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { // todo delete this task, if the related stream task is dropped qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", pTask->streamTaskId.taskId, pTask->id.idStr); - pTask->status.taskStatus = TASK_STATUS__DROPPING; tqDebug("s-task:%s fill-history task set status to be dropping", id); - streamMetaSaveTask(pMeta, pTask); + streamMetaUnregisterTask(pMeta, pTask->id.taskId); streamMetaReleaseTask(pMeta, pTask); return -1; } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); - // stream task in TASK_STATUS__SCAN_HISTORY can not be paused. - // wait for the stream task get ready for scan history data + // 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the + // stream task get ready for scan history data while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { tqDebug( "s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms", @@ -1158,6 +1161,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // now we can stop the stream task execution streamTaskHalt(pStreamTask); + tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e52b8f54a5..0036d72ece 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -404,6 +404,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); + // clear the link between fill-history task and stream task info + pStreamTask->historyTaskId.taskId = 0; streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); @@ -414,6 +416,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { // save to disk taosWLockLatch(&pMeta->lock); + streamMetaSaveTask(pMeta, pStreamTask); if (streamMetaCommit(pMeta) < 0) { // persist to disk diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index ecf874a1ac..f9bbe96d97 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -832,7 +832,7 @@ void streamTaskPause(SStreamTask* pTask) { return; } - while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { + while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { status = pTask->status.taskStatus; if (status == TASK_STATUS__DROPPING) { qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); @@ -849,8 +849,19 @@ void streamTaskPause(SStreamTask* pTask) { taosMsleep(100); } + // todo: use the lock of the task. + taosWLockLatch(&pMeta->lock); + + status = pTask->status.taskStatus; + if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) { + taosWUnLockLatch(&pMeta->lock); + qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr); + return; + } + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + taosWUnLockLatch(&pMeta->lock); int64_t el = taosGetTimestampMs() - st; qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, From 6c9fa4d7c8ee128d71d0ad7b0b23749e4d09804a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jul 2023 19:58:52 +0800 Subject: [PATCH 3/4] fix(stream): check for pause when handling the fill history data. --- source/libs/stream/src/streamExec.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0036d72ece..4ef7d6084d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -172,6 +172,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { bool finished = false; while (1) { + if (streamTaskShouldPause(&pTask->status)) { + double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; + qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el); + return 0; + } + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; From 92d1dbd9ad2a950afb2a5f256f44c871668c85ee Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Jul 2023 01:23:06 +0800 Subject: [PATCH 4/4] fix(stream): set the time window filter before generating create table result. --- source/libs/executor/src/scanoperator.c | 28 ++++++++++++------------- source/libs/stream/src/streamRecover.c | 1 + 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b1c93104fa..fcd5476771 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2120,8 +2120,7 @@ FETCH_NEXT_BLOCK: return pInfo->pUpdateRes; } - SSDataBlock* pBlock = pInfo->pRes; - SDataBlockInfo* pBlockInfo = &pBlock->info; + SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists); NEXT_SUBMIT_BLK: @@ -2145,35 +2144,36 @@ FETCH_NEXT_BLOCK: } } - blockDataCleanup(pBlock); + blockDataCleanup(pInfo->pRes); while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) { SSDataBlock* pRes = NULL; int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id); - qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, - id); + qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id); if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) { qDebug("retrieve data failed, try next block in submit block, %s", id); continue; } - setBlockIntoRes(pInfo, pRes, false); + // filter the block extracted from WAL files, according to the time window + // apply additional time window filter + doBlockDataWindowFilter(pRes, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); + if (pRes->info.rows == 0) { + continue; + } + setBlockIntoRes(pInfo, pRes, false); if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id); return pInfo->pCreateTbRes; } - // apply additional time window filter - doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); - pBlock->info.dataLoad = 1; - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - - doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); @@ -2195,7 +2195,7 @@ FETCH_NEXT_BLOCK: qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id); if (pBlockInfo->rows > 0) { - return pBlock; + return pInfo->pRes; } if (pInfo->pUpdateDataRes->info.rows > 0) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f9bbe96d97..223e185800 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -80,6 +80,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { walReaderGetCurrentVer(pTask->exec.pWalReader)); } } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + streamTaskEnablePause(pTask); streamSetParamForScanHistory(pTask); streamTaskScanHistoryPrepare(pTask); } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {