From a1aa2c9e0c2f1a5b6fc749b0a256892b973a0ac6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 16 Oct 2023 18:47:01 +0800 Subject: [PATCH] fix(stream):add more check for test cases. --- source/dnode/vnode/src/tq/tq.c | 4 ++++ source/libs/stream/src/streamExec.c | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8f3661dffa..1c90812d95 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1196,7 +1196,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pStreamTask->status.keepTaskStatus = status; pStreamTask->status.taskStatus = TASK_STATUS__HALT; + // wal scan not start yet, reset it to be the start position nextProcessedVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); + if (nextProcessedVer == -1) { + nextProcessedVer = pStreamTask->dataRange.range.maxVer + 1; + } tqDebug("s-task:%s level:%d nextProcessedVer:%" PRId64 ", sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr, pStreamTask->info.taskLevel, nextProcessedVer, pStreamTask->status.schedStatus, diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 12b51e6c93..c49c647906 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -309,7 +309,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } - ASSERT(pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); + ASSERT(((pStreamTask->status.taskStatus == TASK_STATUS__STOP) || + (pStreamTask->hTaskInfo.id.taskId == pTask->id.taskId)) && + pTask->status.appendTranstateBlock == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;