diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 1e45f578f6..0bb33b1215 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -109,6 +109,15 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { // seek the stored version and extract data from WAL int32_t code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + SWal *pWal = pTask->exec.pWalReader->pWal; + if (pTask->chkInfo.currentVer < pWal->vers.firstVer ) { + pTask->chkInfo.currentVer = pWal->vers.firstVer; + code = walReadSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); + if (code != TSDB_CODE_SUCCESS) { + streamMetaReleaseTask(pStreamMeta, pTask); + continue; + } + } streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index aefe7885f9..af54904c43 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -188,6 +188,11 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* return -1; } + if (streamMetaCommit(pMeta) < 0) { + tFreeStreamTask(pTask); + return -1; + } + taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); return 0;