From 4138692cea039e3e9abb7b4bb337987a365669ce Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 3 Jan 2024 18:16:18 +0800 Subject: [PATCH] fix(stream): scan wal data files. --- source/dnode/vnode/src/tq/tq.c | 8 ++++---- source/libs/stream/src/streamExec.c | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2533c721ac..925671ae04 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1066,15 +1066,15 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; - int32_t taskId = pReq->taskId; - - if (taskId == STREAM_EXEC_T_EXTRACT_WAL_DATA) { // all tasks are extracted submit data from the wal + if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) { // all tasks are extracted submit data from the wal tqScanWal(pTq); return 0; } int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); - if(code == 0 && taskId > 0){ + + // let's continue scan data in the wal files + if(code == 0 && pReq->reqType > 0){ tqScanWalAsync(pTq, false); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 70b7ec3309..e73c5ffddb 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -728,6 +728,7 @@ static void doStreamExecTaskHelper(void* param, void* tmrId) { tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); // release the task ref count + pTask->status.schedIdleTime = 0; // clear the idle time streamMetaReleaseTask(pTask->pMeta, pTask); }