From a2694b9ce9a1b6e68c0a45b8a10f772188f46609 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Jul 2023 19:44:28 +0800 Subject: [PATCH] fix(stream): fix the invalid check of step2 . --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 13 ++++++------- source/libs/stream/src/streamRecover.c | 3 +-- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9d2c0a4f6a..e34b27e9b8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -598,7 +598,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask); +void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 02509d994d..1f6c162b9d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1154,16 +1154,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { taosMsleep(100); } - streamTaskHalt(pTask); - // now we can stop the stream task execution - // todo upgrade the statu to be HALT from PAUSE or NORMAL - pStreamTask->status.taskStatus = TASK_STATUS__HALT; - tqDebug("s-task:%s level:%d status is set to halt by fill-history task:%s", pStreamTask->id.idStr, - pStreamTask->info.taskLevel, id); + streamTaskHalt(pStreamTask); + tqDebug("s-task:%s level:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, + id); // if it's an source task, extract the last version in wal. - streamHistoryTaskSetVerRangeStep2(pTask); + pRange = &pTask->dataRange.range; + int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + streamHistoryTaskSetVerRangeStep2(pTask, latestVer); } if (!streamTaskRecoverScanStep1Finished(pTask)) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 42eb27bb8f..1c9e2672d1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -654,9 +654,8 @@ int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask) { return qStreamRecoverSetAllStepFinished(exec); } -void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask) { +void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer) { SVersionRange* pRange = &pTask->dataRange.range; - int64_t latestVer = walReaderGetCurrentVer(pTask->exec.pWalReader); ASSERT(latestVer >= pRange->maxVer); int64_t nextStartVer = pRange->maxVer + 1;