From e89f530db2cb666eedc56822c92b9eb15c9dfee7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Jul 2023 10:55:59 +0800 Subject: [PATCH 01/11] fix(stream): scan wal in step2 --- include/libs/stream/tstream.h | 9 +++++- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 45 +++++++++++++------------- source/dnode/vnode/src/tq/tqRestore.c | 19 ++++++++--- source/libs/stream/src/stream.c | 3 -- source/libs/stream/src/streamExec.c | 43 ++++++++++++++++++++---- source/libs/stream/src/streamRecover.c | 4 +-- 7 files changed, 85 insertions(+), 40 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c171ca510..8e286c6e1b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,6 +45,7 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner + TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause }; @@ -302,6 +303,12 @@ typedef struct { SStreamQueue* queue; } STaskOutputInfo; +typedef struct { + int64_t init; + int64_t step1Start; + int64_t step2Start; +} STaskTimestamp; + struct SStreamTask { SStreamId id; SSTaskBasicInfo info; @@ -316,7 +323,7 @@ struct SStreamTask { SArray* pUpstreamEpInfoList; // SArray, // children info int32_t nextCheckId; SArray* checkpointInfo; // SArray - int64_t initTs; + STaskTimestamp tsInfo; // output union { STaskDispatcherFixedEp fixedEpDispatcher; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index e8bdf97c70..0e3ad3293b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -72,7 +72,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { return -1; } - pTask->initTs = taosGetTimestampMs(); + pTask->tsInfo.init = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3bcc141edc..3d715afc71 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -818,7 +818,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } - pTask->initTs = taosGetTimestampMs(); + pTask->tsInfo.init = taosGetTimestampMs(); pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; @@ -1115,7 +1115,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamTaskDisablePause(pTask); } - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamMetaReleaseTask(pMeta, pTask); @@ -1123,7 +1123,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } double el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", id, el); + tqDebug("s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs", id, el); if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; @@ -1173,34 +1173,35 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - st = taosGetTimestampMs(); + pTask->tsInfo.step2Start = taosGetTimestampMs(); streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); } if (!streamTaskRecoverScanStep2Finished(pTask)) { - streamSourceScanHistoryData(pTask); - - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING || streamTaskShouldPause(&pTask->status)) { + pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); streamMetaReleaseTask(pMeta, pTask); return 0; } - streamTaskRecoverSetAllStepFinished(pTask); + int64_t dstVer = pTask->dataRange.range.minVer - 1; + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); + tqDebug("s-task:%s seek wal reader to ver:%"PRId64, id, dstVer); } - el = (taosGetTimestampMs() - st) / 1000.0; - tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); - - // 3. notify downstream tasks to transfer executor state after handle all history blocks. - if (!pTask->status.transferState) { - code = streamDispatchTransferStateMsg(pTask); - if (code != TSDB_CODE_SUCCESS) { - // todo handle error - } - - pTask->status.transferState = true; - } +// int64_t el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; +// tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); +// +// // 3. notify downstream tasks to transfer executor state after handle all history blocks. +// if (!pTask->status.transferState) { +// code = streamDispatchTransferStateMsg(pTask); +// if (code != TSDB_CODE_SUCCESS) { +// // todo handle error +// } +// +// pTask->status.transferState = true; +// } // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 5. resume the related stream task. @@ -1409,8 +1410,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { // even in halt status, the data in inputQ must be processed - int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { + int8_t st = pTask->status.taskStatus; + if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__SCAN_HISTORY_WAL) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 3f5829d3ae..921ea2cc68 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -247,7 +247,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (status != TASK_STATUS__NORMAL) { + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__SCAN_HISTORY_WAL) { tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); streamMetaReleaseTask(pStreamMeta, pTask); continue; @@ -261,6 +261,17 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; + if (pTask->info.fillHistory == 1) { + ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL); + // the maximum version of data in the WAL has reached already, the step2 is done + if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { + qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore", + pTask->id.idStr, pTask->chkInfo.currentVer); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + } + // seek the stored version and extract data from WAL int32_t code = doSetOffsetForWalReader(pTask, vgId); if (code != TSDB_CODE_SUCCESS) { @@ -283,9 +294,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { noDataInWal = false; code = tAppendDataToInputQueue(pTask, pItem); if (code == TSDB_CODE_SUCCESS) { - pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, - pTask->chkInfo.currentVer); + int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); + pTask->chkInfo.currentVer = ver; + tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fa0561a722..ba8e358f68 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -324,9 +324,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { return -1; } - /*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatchStreamBlock(pTask);*/ - /*}*/ return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index af93d95a9f..c939ea1807 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -589,6 +589,8 @@ int32_t streamTryExec(SStreamTask* pTask) { int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); + const char* id = pTask->id.idStr; + if (schedStatus == TASK_SCHED_STATUS__WAITING) { int32_t code = streamExecForAll(pTask); if (code < 0) { // todo this status shoudl be removed @@ -597,16 +599,43 @@ int32_t streamTryExec(SStreamTask* pTask) { } // todo the task should be commit here - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); + if (taosQueueEmpty(pTask->inputQueue->queue)) { + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && + pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { + // fill-history WAL scan has completed + streamTaskRecoverSetAllStepFinished(pTask); - if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && - (!streamTaskShouldPause(&pTask->status))) { - streamSchedExec(pTask); + double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); + + // 3. notify downstream tasks to transfer executor state after handle all history blocks. + if (!pTask->status.transferState) { + code = streamDispatchTransferStateMsg(pTask); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + + pTask->status.transferState = true; + } + + // the last execution of fill-history task, in order to transfer task operator states. + code = streamExecForAll(pTask); + + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->status.schedStatus); + } + } else { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->status.schedStatus); + + if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { + streamSchedExec(pTask); + } } } else { - qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", pTask->id.idStr, + qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index dffa28e769..fd5d35808e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -23,7 +23,7 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static void streamTaskSetForReady(SStreamTask* pTask, int32_t numOfReqs) { ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - int64_t el = (taosGetTimestampMs() - pTask->initTs); + int64_t el = (taosGetTimestampMs() - pTask->tsInfo.init); qDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%dms, task status:%s", pTask->id.idStr, numOfReqs, (int32_t) el, streamGetTaskStatusStr(pTask->status.taskStatus)); @@ -663,7 +663,7 @@ void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { // no input data yet. no need to execute the secondardy scan while stream task halt streamTaskRecoverSetAllStepFinished(pTask); qDebug( - "s-task:%s no need to perform secondary scan-history-data(step 2), since no data ingest during secondary scan", + "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during secondary scan", pTask->id.idStr); } else { // 2. do secondary scan of the history data, the time window remain, and the version range is updated to From 40fe3ef4f6b77bbc98c0f71b4089cec976d00cc4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Jul 2023 14:49:20 +0800 Subject: [PATCH 02/11] fix(stream): refactor the step2 wal scan. --- include/libs/stream/tstream.h | 3 +- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/tq/tq.c | 82 ++++++++++++------------- source/dnode/vnode/src/tq/tqRead.c | 6 +- source/dnode/vnode/src/tq/tqRestore.c | 21 +++++-- source/libs/executor/src/scanoperator.c | 31 +++++++--- source/libs/stream/src/streamExec.c | 58 +++++++++-------- source/libs/stream/src/streamRecover.c | 13 ++-- 8 files changed, 125 insertions(+), 91 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8e286c6e1b..c4033b5482 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -588,6 +588,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); +int32_t streamTaskEndScanWAL(SStreamTask* pTask); SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); @@ -605,7 +606,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); +bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f60cc2f406..a7ce18198d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -241,7 +241,7 @@ bool tqNextBlockImpl(STqReader *pReader, const char *idstr); SWalReader* tqGetWalReader(STqReader* pReader); SSDataBlock* tqGetResultBlock (STqReader* pReader); -int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, const char *id); +int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock** pRes, const char* idstr); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3d715afc71..15b66b8e07 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1128,6 +1128,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->info.fillHistory) { SVersionRange* pRange = NULL; SStreamTask* pStreamTask = NULL; + bool done = false; if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) { // 1. stop the related stream task, get the current scan wal version of stream task, ver. @@ -1157,58 +1158,55 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // now we can stop the stream task execution streamTaskHalt(pStreamTask); - tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, - id); + tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, + pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id); // if it's an source task, extract the last version in wal. pRange = &pTask->dataRange.range; int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - streamHistoryTaskSetVerRangeStep2(pTask, latestVer); + done = streamHistoryTaskSetVerRangeStep2(pTask, latestVer); } - if (!streamTaskRecoverScanStep1Finished(pTask)) { - STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 - ", do secondary scan-history data after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); - ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - + if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); - } + streamTaskEndScanWAL(pTask); + } else { + if (!streamTaskRecoverScanStep1Finished(pTask)) { + STimeWindow* pWindow = &pTask->dataRange.window; + tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + ", do secondary scan-history data after halt the related stream task:%s", + id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); + ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - if (!streamTaskRecoverScanStep2Finished(pTask)) { - pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; - if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); - streamMetaReleaseTask(pMeta, pTask); - return 0; + pTask->tsInfo.step2Start = taosGetTimestampMs(); + streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); } - int64_t dstVer = pTask->dataRange.range.minVer - 1; - walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); - tqDebug("s-task:%s seek wal reader to ver:%"PRId64, id, dstVer); + if (!streamTaskRecoverScanStep2Finished(pTask)) { + pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY_WAL; + if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { + tqDebug("s-task:%s is dropped or paused, abort recover in step1", id); + streamMetaReleaseTask(pMeta, pTask); + return 0; + } + + int64_t dstVer = pTask->dataRange.range.minVer - 1; + + pTask->chkInfo.currentVer = dstVer; + walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); + tqDebug("s-task:%s wal reader start scan from WAL ver:%" PRId64 ", set sched-status:%d", id, dstVer, + TASK_SCHED_STATUS__INACTIVE); + } + + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + + // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. + // 5. resume the related stream task. + streamMetaReleaseTask(pMeta, pTask); + streamMetaReleaseTask(pMeta, pStreamTask); + + tqStartStreamTasks(pTq); } - -// int64_t el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; -// tqDebug("s-task:%s history data scan stage(step 2) ended, elapsed time:%.2fs", id, el); -// -// // 3. notify downstream tasks to transfer executor state after handle all history blocks. -// if (!pTask->status.transferState) { -// code = streamDispatchTransferStateMsg(pTask); -// if (code != TSDB_CODE_SUCCESS) { -// // todo handle error -// } -// -// pTask->status.transferState = true; -// } - - // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. - // 5. resume the related stream task. - streamTryExec(pTask); - - streamMetaReleaseTask(pMeta, pTask); - streamMetaReleaseTask(pMeta, pStreamTask); } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of @@ -1218,7 +1216,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( - "s-task:%s scan history in stream time window completed, no related fill history task, reset the time " + "s-task:%s scan history in stream time window completed, no related fill-history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); } else { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 675cbe4549..518596a47c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -302,13 +302,17 @@ int32_t tqReaderSeek(STqReader* pReader, int64_t ver, const char* id) { return 0; } -int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, const char* id) { +int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, const char* id) { int32_t code = walNextValidMsg(pReader); if (code != TSDB_CODE_SUCCESS) { return code; } int64_t ver = pReader->pHead->head.version; + if (ver > maxVer) { + tqDebug("maxVer in WAL:%"PRId64" reached, do not scan wal anymore, %s", maxVer, id); + return TSDB_CODE_SUCCESS; + } if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT) { void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SSubmitReq2Msg)); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 921ea2cc68..614adfeded 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -38,9 +38,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { if (shouldIdle) { taosWLockLatch(&pMeta->lock); - pMeta->walScanCounter -= 1; - times = pMeta->walScanCounter; - + times = (--pMeta->walScanCounter); ASSERT(pMeta->walScanCounter >= 0); if (pMeta->walScanCounter <= 0) { @@ -242,7 +240,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; // non-source or fill-history tasks don't need to response the WAL scan action. - if (pTask->info.taskLevel != TASK_LEVEL__SOURCE || pTask->info.fillHistory == 1) { + if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -280,10 +278,10 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue); + int64_t maxVer = (pTask->info.fillHistory == 1)? pTask->dataRange.range.maxVer:INT64_MAX; - // append the data for the stream SStreamQueueItem* pItem = NULL; - code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); + code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr); if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); @@ -297,6 +295,17 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = ver; tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); + + { + if (pTask->info.fillHistory == 1) { + // the maximum version of data in the WAL has reached already, the step2 is done + if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { + qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore, set the transfer state flag", + pTask->id.idStr, pTask->chkInfo.currentVer); + pTask->status.transferState = true; + } + } + } } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a15b128a99..71b9d44a0a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1775,19 +1775,32 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { - if (pWindow->skey != INT64_MIN) { - qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey); - + if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; + bool hasUnqualified = false; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); - for(int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*) colDataGetData(pCol, i); - p[i] = (*ts >= pWindow->skey); - if (!p[i]) { - hasUnqualified = true; + if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); + + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } else if (pWindow->ekey != INT64_MAX) { + qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->skey); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts <= pWindow->ekey); + + if (!p[i]) { + hasUnqualified = true; + } } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c939ea1807..4db3494a3f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -510,12 +510,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); - if (pTask->info.fillHistory && pTask->status.transferState) { - int32_t code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this - return 0; - } - } +// if (pTask->info.fillHistory && pTask->status.transferState) { +// int32_t code = streamTransferStateToStreamTask(pTask); +// if (code != TSDB_CODE_SUCCESS) { // todo handle this +// return 0; +// } +// } break; } @@ -584,6 +584,28 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE); } +int32_t streamTaskEndScanWAL(SStreamTask* pTask) { + const char* id = pTask->id.idStr; + double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); + + // 3. notify downstream tasks to transfer executor state after handle all history blocks. + pTask->status.transferState = true; + + int32_t code = streamDispatchTransferStateMsg(pTask); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + + // the last execution of fill-history task, in order to transfer task operator states. + code = streamTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return code; + } + + return TSDB_CODE_SUCCESS; +} + int32_t streamTryExec(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. int8_t schedStatus = @@ -600,27 +622,11 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here if (taosQueueEmpty(pTask->inputQueue->queue)) { - if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && - pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { - // fill-history WAL scan has completed + // fill-history WAL scan has completed + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL && pTask->status.transferState == true) { streamTaskRecoverSetAllStepFinished(pTask); - - double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); - - // 3. notify downstream tasks to transfer executor state after handle all history blocks. - if (!pTask->status.transferState) { - code = streamDispatchTransferStateMsg(pTask); - if (code != TSDB_CODE_SUCCESS) { - // todo handle error - } - - pTask->status.transferState = true; - } - - // the last execution of fill-history task, in order to transfer task operator states. - code = streamExecForAll(pTask); - + streamTaskEndScanWAL(pTask); + } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index fd5d35808e..41f28f375b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -284,7 +284,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* // common int32_t streamSetParamForScanHistory(SStreamTask* pTask) { - qDebug("s-task:%s set operator option for scan-history-data", pTask->id.idStr); + qDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr); return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor); } @@ -507,7 +507,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64 + qDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 " ver range:%" PRId64 " - %" PRId64, pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); @@ -654,7 +654,7 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { return qStreamRecoverSetAllStepFinished(exec); } -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { +bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { SVersionRange* pRange = &pTask->dataRange.range; ASSERT(latestVer >= pRange->maxVer); @@ -663,13 +663,16 @@ void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { // no input data yet. no need to execute the secondardy scan while stream task halt streamTaskRecoverSetAllStepFinished(pTask); qDebug( - "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during secondary scan", - pTask->id.idStr); + "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, " + "related stream task currentVer:%" PRId64, + pTask->id.idStr, latestVer); + return true; } else { // 2. do secondary scan of the history data, the time window remain, and the version range is updated to // [pTask->dataRange.range.maxVer, ver1] pRange->minVer = nextStartVer; pRange->maxVer = latestVer - 1; + return false; } } From 0ac7ebe9c77289526696cd7d2fb4d4fddeb4650e Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 27 Jul 2023 14:56:06 +0800 Subject: [PATCH 03/11] op stream scan --- source/libs/executor/src/scanoperator.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 71b9d44a0a..195ee88f2b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1878,7 +1878,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); - pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN2; } pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); @@ -1891,8 +1890,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pStreamInfo->recoverScanFinished = false; } - if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1 || - pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN2) { + if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { if (isTaskKilled(pTaskInfo)) { return NULL; } From 0b53bdbae69eb1d8ce71f1b73b1c7af6608b40f4 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 27 Jul 2023 15:09:25 +0800 Subject: [PATCH 04/11] op stream scan --- source/libs/executor/inc/executorInt.h | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 5baf0978cd..9a917adf1b 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -80,7 +80,6 @@ enum { STREAM_RECOVER_STEP__PREPARE1, STREAM_RECOVER_STEP__PREPARE2, STREAM_RECOVER_STEP__SCAN1, - STREAM_RECOVER_STEP__SCAN2, }; extern int32_t exchangeObjRefPool; From c72274504ecf3c4f546a0e51cf2dd6714aa81b97 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 27 Jul 2023 15:13:57 +0800 Subject: [PATCH 05/11] op stream scan --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 195ee88f2b..4fb07023fa 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1871,6 +1871,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); pStreamInfo->recoverStep = STREAM_RECOVER_STEP__SCAN1; + pStreamInfo->recoverScanFinished = false; } else { pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer; pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer; @@ -1887,7 +1888,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; - pStreamInfo->recoverScanFinished = false; } if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { From c54bd55c68629e5999a81bcb6fcc30c7efdb57ef Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 27 Jul 2023 15:16:22 +0800 Subject: [PATCH 06/11] op stream scan --- source/libs/executor/src/scanoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4fb07023fa..5f39204974 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1879,6 +1879,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); + pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE; } pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); From 7c9898cfa51495ea49dd0556057a5911a4d5c476 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 27 Jul 2023 15:56:06 +0800 Subject: [PATCH 07/11] adj stream scan --- source/libs/executor/src/scanoperator.c | 72 ++++++++++++------------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5f39204974..85c7024932 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1902,35 +1902,35 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { printDataBlock(pInfo->pRecoverRes, "scan recover"); return pInfo->pRecoverRes; } break; - case STREAM_SCAN_FROM_UPDATERES: { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - printDataBlock(pInfo->pUpdateRes, "recover update"); - return pInfo->pUpdateRes; - } break; - case STREAM_SCAN_FROM_DELETE_DATA: { - generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); - prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; - copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); - pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - printDataBlock(pInfo->pDeleteDataRes, "recover delete"); - return pInfo->pDeleteDataRes; - } break; - case STREAM_SCAN_FROM_DATAREADER_RANGE: { - SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - if (pSDB) { - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; - checkUpdateData(pInfo, true, pSDB, false); - printDataBlock(pSDB, "scan recover update"); - calBlockTbName(pInfo, pSDB); - return pSDB; - } - blockDataCleanup(pInfo->pUpdateDataRes); - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } break; + // case STREAM_SCAN_FROM_UPDATERES: { + // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + // printDataBlock(pInfo->pUpdateRes, "recover update"); + // return pInfo->pUpdateRes; + // } break; + // case STREAM_SCAN_FROM_DELETE_DATA: { + // generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + // prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + // pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + // copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); + // pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; + // printDataBlock(pInfo->pDeleteDataRes, "recover delete"); + // return pInfo->pDeleteDataRes; + // } break; + // case STREAM_SCAN_FROM_DATAREADER_RANGE: { + // SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); + // if (pSDB) { + // STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + // pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; + // checkUpdateData(pInfo, true, pSDB, false); + // printDataBlock(pSDB, "scan recover update"); + // calBlockTbName(pInfo, pSDB); + // return pSDB; + // } + // blockDataCleanup(pInfo->pUpdateDataRes); + // pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + // } break; default: break; } @@ -1939,13 +1939,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->pRecoverRes != NULL) { calBlockTbName(pInfo, pInfo->pRecoverRes); if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { - if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { - TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); - pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); - } else { - pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer); - doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); - } + // if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { + TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); + // } else { + // pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer); + // doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes); + // } } if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; From 6ff50d4eaf2f2bb43f39e9d57c0addaef1db6ed6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Jul 2023 16:10:55 +0800 Subject: [PATCH 08/11] fix(stream): update the step2 scan wal files. --- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tq/tqRestore.c | 44 ++++++++++++------------- source/libs/executor/src/executor.c | 4 +-- source/libs/executor/src/scanoperator.c | 2 +- source/libs/stream/src/streamExec.c | 10 +----- source/libs/stream/src/streamRecover.c | 3 +- 6 files changed, 29 insertions(+), 36 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 518596a47c..389a23aa91 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -310,7 +310,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con int64_t ver = pReader->pHead->head.version; if (ver > maxVer) { - tqDebug("maxVer in WAL:%"PRId64" reached, do not scan wal anymore, %s", maxVer, id); + tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 614adfeded..67ae160d6d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -209,6 +209,17 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { return TSDB_CODE_SUCCESS; } +static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { + if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { + qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64 + ", not scan wal anymore, set the transfer state flag", + pTask->id.idStr, ver, pTask->dataRange.range.maxVer); + pTask->status.transferState = true; + + /*int32_t code = */streamSchedExec(pTask); + } +} + int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noDataInWal = true; @@ -251,6 +262,15 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } + if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { + ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); + // the maximum version of data in the WAL has reached already, the step2 is done + tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, + pTask->dataRange.range.maxVer); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + if (tInputQueueIsFull(pTask)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); @@ -259,17 +279,6 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; - if (pTask->info.fillHistory == 1) { - ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL); - // the maximum version of data in the WAL has reached already, the step2 is done - if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { - qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore", - pTask->id.idStr, pTask->chkInfo.currentVer); - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - } - // seek the stored version and extract data from WAL int32_t code = doSetOffsetForWalReader(pTask, vgId); if (code != TSDB_CODE_SUCCESS) { @@ -284,6 +293,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr); if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue + checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -294,18 +304,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { if (code == TSDB_CODE_SUCCESS) { int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = ver; + checkForFillHistoryVerRange(pTask, ver); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); - - { - if (pTask->info.fillHistory == 1) { - // the maximum version of data in the WAL has reached already, the step2 is done - if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { - qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore, set the transfer state flag", - pTask->id.idStr, pTask->chkInfo.currentVer); - pTask->status.transferState = true; - } - } - } } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4c06b34df4..d14b79f4bc 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -892,7 +892,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan pStreamInfo->recoverStep1Finished = false; pStreamInfo->recoverStep2Finished = false; - qDebug("%s step 1. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 + qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); @@ -911,7 +911,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->recoverStep1Finished = true; pStreamInfo->recoverStep2Finished = false; - qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 + qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5f39204974..72c6dec1e0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1793,7 +1793,7 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW } } } else if (pWindow->ekey != INT64_MAX) { - qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->skey); + qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t* ts = (int64_t*)colDataGetData(pCol, i); p[i] = (*ts <= pWindow->ekey); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4db3494a3f..c546b36191 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -355,8 +355,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - pTask->status.transferState = false; // reset this value, to avoid transfer state again - + // todo: destroy this task here qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -510,13 +509,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); -// if (pTask->info.fillHistory && pTask->status.transferState) { -// int32_t code = streamTransferStateToStreamTask(pTask); -// if (code != TSDB_CODE_SUCCESS) { // todo handle this -// return 0; -// } -// } - break; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 41f28f375b..ecf874a1ac 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -844,7 +844,8 @@ void streamTaskPause(SStreamTask* pTask) { return; } - qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId); + const char* pStatus = streamGetTaskStatusStr(status); + qDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId); taosMsleep(100); } From 22a5a18fffbddcb889fe84f141052d881dcf5b3c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Jul 2023 18:57:36 +0800 Subject: [PATCH 09/11] fix(stream): 1. set correct timewindow after step2. 2. handle the case when the task is failed to be added into the meta store. --- include/libs/stream/tstream.h | 4 ++-- source/dnode/snode/src/snode.c | 4 +++- source/dnode/vnode/src/tq/tq.c | 34 +++++++++++++++++++---------- source/dnode/vnode/src/tq/tqPush.c | 3 +++ source/libs/executor/src/executor.c | 3 ++- source/libs/stream/src/streamMeta.c | 8 ++++--- 6 files changed, 37 insertions(+), 19 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c4033b5482..b7544a13ca 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -647,9 +647,9 @@ void streamMetaClose(SStreamMeta* streamMeta); // save to b-tree meta store int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); // todo remove it +int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 0e3ad3293b..b0ddefce79 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -160,7 +160,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { // 2.save task taosWLockLatch(&pSnode->pMeta->lock); - code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask); + + bool added = false; + code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added); if (code < 0) { taosWUnLockLatch(&pSnode->pMeta->lock); return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 15b66b8e07..2331e690df 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1039,27 +1039,36 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamMeta* pStreamMeta = pTq->pStreamMeta; // 2.save task, use the newest commit version as the initial start version of stream task. - int32_t taskId = 0; - taosWLockLatch(&pStreamMeta->lock); - code = streamMetaRegisterTask(pStreamMeta, sversion, pTask); + int32_t taskId = pTask->id.taskId; + bool added = false; - taskId = pTask->id.taskId; + taosWLockLatch(&pStreamMeta->lock); + code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); + if (code < 0) { - tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); + tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks); tFreeStreamTask(pTask); taosWUnLockLatch(&pStreamMeta->lock); return -1; } + // not added into meta store + if (!added) { + tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId); + tFreeStreamTask(pTask); + pTask = NULL; + } + taosWUnLockLatch(&pStreamMeta->lock); - tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + + tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); // 3. It's an fill history task, do nothing. wait for the main task to start it SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); - if (p != NULL) { - streamTaskCheckDownstreamTasks(pTask); + if (p != NULL) { // reset the downstreamReady flag. + p->status.downstreamReady = 0; + streamTaskCheckDownstreamTasks(p); } streamMetaReleaseTask(pStreamMeta, p); @@ -1174,7 +1183,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (!streamTaskRecoverScanStep1Finished(pTask)) { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 - ", do secondary scan-history data after halt the related stream task:%s", + ", do secondary scan-history from WAL after halt the related stream task:%s", id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, id); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); @@ -1216,12 +1225,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (pTask->historyTaskId.taskId == 0) { *pWindow = (STimeWindow){INT64_MIN, INT64_MAX}; tqDebug( - "s-task:%s scan history in stream time window completed, no related fill-history task, reset the time " + "s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time " "window:%" PRId64 " - %" PRId64, id, pWindow->skey, pWindow->ekey); + qResetStreamInfoTimeWindow(pTask->exec.pExecutor); } else { tqDebug( - "s-task:%s scan history in stream time window completed, now start to handle data from WAL, start " + "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 06af53d453..5ccf4c825b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -35,7 +35,10 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v tqProcessSubmitReqForSubscribe(pTq); } + taosRLockLatch(&pTq->pStreamMeta->lock); int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); + taosRUnLockLatch(&pTq->pStreamMeta->lock); + tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks); // push data for stream processing: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d14b79f4bc..15b2cc4efb 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -122,8 +122,9 @@ void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo) { return; } - qDebug("%s set fill history start key:%" PRId64, GET_TASKID(pTaskInfo), INT64_MIN); + qDebug("%s set stream fill-history window:%" PRId64"-%"PRId64, GET_TASKID(pTaskInfo), INT64_MIN, INT64_MAX); pTaskInfo->streamInfo.fillHistoryWindow.skey = INT64_MIN; + pTaskInfo->streamInfo.fillHistoryWindow.ekey = INT64_MAX; } static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 758530f4fb..ae07738868 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -237,7 +237,9 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { } // add to the ready tasks hash map, not the restored tasks hash map -int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { +int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { + *pAdded = false; + void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { @@ -261,13 +263,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); + *pAdded = true; return 0; } -int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { +int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { size_t size = taosHashGetSize(pMeta->pTasks); ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasks)); - return (int32_t)size; } From fbb33be85d03669ec89510cd9e0c185a85ea0990 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Jul 2023 18:59:38 +0800 Subject: [PATCH 10/11] other: add some comments. --- source/dnode/vnode/src/tq/tq.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2331e690df..8faa5dcc62 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1230,6 +1230,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { id, pWindow->skey, pWindow->ekey); qResetStreamInfoTimeWindow(pTask->exec.pExecutor); } else { + // when related fill-history task exists, update the fill-history time window only when the + // state transfer is completed. tqDebug( "s-task:%s scan-history in stream time window completed, now start to handle data from WAL, start " "ver:%" PRId64 ", window:%" PRId64 " - %" PRId64, From 7f3d1dc46430013dacab7e8419ce95a757dd71be Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Jul 2023 23:06:53 +0800 Subject: [PATCH 11/11] fix(stream): fix memory leak. --- source/dnode/vnode/src/tq/tq.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8faa5dcc62..310cc3599b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1179,6 +1179,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); streamTaskEndScanWAL(pTask); + streamMetaReleaseTask(pMeta, pTask); } else { if (!streamTaskRecoverScanStep1Finished(pTask)) { STimeWindow* pWindow = &pTask->dataRange.window;