From 6ff50d4eaf2f2bb43f39e9d57c0addaef1db6ed6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 27 Jul 2023 16:10:55 +0800 Subject: [PATCH] fix(stream): update the step2 scan wal files. --- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tq/tqRestore.c | 44 ++++++++++++------------- source/libs/executor/src/executor.c | 4 +-- source/libs/executor/src/scanoperator.c | 2 +- source/libs/stream/src/streamExec.c | 10 +----- source/libs/stream/src/streamRecover.c | 3 +- 6 files changed, 29 insertions(+), 36 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 518596a47c..389a23aa91 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -310,7 +310,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con int64_t ver = pReader->pHead->head.version; if (ver > maxVer) { - tqDebug("maxVer in WAL:%"PRId64" reached, do not scan wal anymore, %s", maxVer, id); + tqDebug("maxVer in WAL:%"PRId64" reached current:%"PRId64", do not scan wal anymore, %s", maxVer, ver, id); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 614adfeded..67ae160d6d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -209,6 +209,17 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { return TSDB_CODE_SUCCESS; } +static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { + if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { + qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 "reach the maximum ver:%" PRId64 + ", not scan wal anymore, set the transfer state flag", + pTask->id.idStr, ver, pTask->dataRange.range.maxVer); + pTask->status.transferState = true; + + /*int32_t code = */streamSchedExec(pTask); + } +} + int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = true; bool noDataInWal = true; @@ -251,6 +262,15 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } + if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { + ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); + // the maximum version of data in the WAL has reached already, the step2 is done + tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, + pTask->dataRange.range.maxVer); + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + if (tInputQueueIsFull(pTask)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); @@ -259,17 +279,6 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { *pScanIdle = false; - if (pTask->info.fillHistory == 1) { - ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY_WAL); - // the maximum version of data in the WAL has reached already, the step2 is done - if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { - qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore", - pTask->id.idStr, pTask->chkInfo.currentVer); - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - } - // seek the stored version and extract data from WAL int32_t code = doSetOffsetForWalReader(pTask, vgId); if (code != TSDB_CODE_SUCCESS) { @@ -284,6 +293,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr); if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue + checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -294,18 +304,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { if (code == TSDB_CODE_SUCCESS) { int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); pTask->chkInfo.currentVer = ver; + checkForFillHistoryVerRange(pTask, ver); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); - - { - if (pTask->info.fillHistory == 1) { - // the maximum version of data in the WAL has reached already, the step2 is done - if (pTask->chkInfo.currentVer > pTask->dataRange.range.maxVer) { - qWarn("s-task:%s fill-history scan WAL, reach the maximum ver:%" PRId64 ", not scan wal anymore, set the transfer state flag", - pTask->id.idStr, pTask->chkInfo.currentVer); - pTask->status.transferState = true; - } - } - } } else { tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4c06b34df4..d14b79f4bc 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -892,7 +892,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRan pStreamInfo->recoverStep1Finished = false; pStreamInfo->recoverStep2Finished = false; - qDebug("%s step 1. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 + qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); @@ -911,7 +911,7 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->recoverStep1Finished = true; pStreamInfo->recoverStep2Finished = false; - qDebug("%s step 2. set param for stream scanner for scan history data, verRange:%" PRId64 " - %" PRId64 + qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 " - %" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5f39204974..72c6dec1e0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1793,7 +1793,7 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW } } } else if (pWindow->ekey != INT64_MAX) { - qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->skey); + qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t* ts = (int64_t*)colDataGetData(pCol, i); p[i] = (*ts <= pWindow->ekey); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4db3494a3f..c546b36191 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -355,8 +355,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - pTask->status.transferState = false; // reset this value, to avoid transfer state again - + // todo: destroy this task here qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -510,13 +509,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); -// if (pTask->info.fillHistory && pTask->status.transferState) { -// int32_t code = streamTransferStateToStreamTask(pTask); -// if (code != TSDB_CODE_SUCCESS) { // todo handle this -// return 0; -// } -// } - break; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 41f28f375b..ecf874a1ac 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -844,7 +844,8 @@ void streamTaskPause(SStreamTask* pTask) { return; } - qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId); + const char* pStatus = streamGetTaskStatusStr(status); + qDebug("s-task:%s wait for the task can be paused, status:%s, vgId:%d", pTask->id.idStr, pStatus, pMeta->vgId); taosMsleep(100); }