From eb0553b737513b4474e1de652957560cab0448a9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Mar 2025 18:45:29 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 6 ------ source/dnode/vnode/src/tq/tqStreamTask.c | 21 ++++++++++----------- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 ----- 3 files changed, 10 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e1298b11ea..407a1e88c8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -920,12 +920,6 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask // now the fill-history task starts to scan data from wal files. code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); -// if (code == TSDB_CODE_SUCCESS) { -// code = tqScanWalAsync(pTq, false); -// if (code) { -// tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code)); -// } -// } } } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 2052e4f3bf..4d2449bb37 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -148,6 +148,7 @@ static void doStartScanWal(void* param, void* tmrId) { return; } + // failed to lock, try 500ms later code = streamMetaTryRlock(pMeta); if (code == 0) { numOfTasks = taosArrayGetSize(pMeta->pTaskList); @@ -156,20 +157,18 @@ static void doStartScanWal(void* param, void* tmrId) { numOfTasks = 0; } - if (numOfTasks == 0) { - goto _end; - } + if (numOfTasks > 0) { + tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); - tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); - - #if 0 +#if 0 // wait for the vnode is freed, and invalid read may occur. taosMsleep(10000); - #endif +#endif - code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); - if (code) { - tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); + code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + if (code) { + tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); + } } _end: @@ -192,7 +191,7 @@ void tqScanWalAsync(STQ* pTq) { // 1. the vnode should be the leader. // 2. the stream isn't disabled - if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) { + if ((pMeta->role != NODE_ROLE_LEADER) || tsDisableStream) { tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId); return; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index fb3582c5ff..d5edfe4b35 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -952,11 +952,6 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { streamMetaWUnLock(pMeta); -// if (scanWal && (vgId != SNODE_HANDLE)) { -// tqDebug("vgId:%d start scan wal for executing tasks", vgId); -// code = tqScanWalAsync(pMeta->ahandle, true); -// } - return code; }