refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2025-03-04 18:45:29 +08:00
parent b82d60036b
commit eb0553b737
3 changed files with 10 additions and 22 deletions

View File

@ -920,12 +920,6 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
// now the fill-history task starts to scan data from wal files. // now the fill-history task starts to scan data from wal files.
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
// if (code == TSDB_CODE_SUCCESS) {
// code = tqScanWalAsync(pTq, false);
// if (code) {
// tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
// }
// }
} }
} }

View File

@ -148,6 +148,7 @@ static void doStartScanWal(void* param, void* tmrId) {
return; return;
} }
// failed to lock, try 500ms later
code = streamMetaTryRlock(pMeta); code = streamMetaTryRlock(pMeta);
if (code == 0) { if (code == 0) {
numOfTasks = taosArrayGetSize(pMeta->pTaskList); numOfTasks = taosArrayGetSize(pMeta->pTaskList);
@ -156,20 +157,18 @@ static void doStartScanWal(void* param, void* tmrId) {
numOfTasks = 0; numOfTasks = 0;
} }
if (numOfTasks == 0) { if (numOfTasks > 0) {
goto _end; tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
}
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); #if 0
#if 0
// wait for the vnode is freed, and invalid read may occur. // wait for the vnode is freed, and invalid read may occur.
taosMsleep(10000); taosMsleep(10000);
#endif #endif
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
if (code) { if (code) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
}
} }
_end: _end:
@ -192,7 +191,7 @@ void tqScanWalAsync(STQ* pTq) {
// 1. the vnode should be the leader. // 1. the vnode should be the leader.
// 2. the stream isn't disabled // 2. the stream isn't disabled
if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) { if ((pMeta->role != NODE_ROLE_LEADER) || tsDisableStream) {
tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId); tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
return; return;
} }

View File

@ -952,11 +952,6 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
// if (scanWal && (vgId != SNODE_HANDLE)) {
// tqDebug("vgId:%d start scan wal for executing tasks", vgId);
// code = tqScanWalAsync(pMeta->ahandle, true);
// }
return code; return code;
} }