From f23a8a37bc3f157b85870066f5f6f2e8ee2299eb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 10 Apr 2024 16:43:02 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 1 - source/dnode/snode/inc/sndInt.h | 4 ++-- source/dnode/vnode/src/tqCommon/tqCommon.c | 9 +++++++++ source/libs/stream/src/streamMeta.c | 2 -- source/libs/stream/src/streamStart.c | 3 +-- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 138fad0ddb..c12bb146b4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -516,7 +516,6 @@ typedef struct SStreamMeta { TdThreadMutex backendMutex; SMetaHbInfo* pHbInfo; STaskUpdateInfo updateInfo; - SHashObj* pUpdateTaskSet; int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfPausedTasks; int64_t rid; diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 68f7f756d5..024c3c6bae 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -30,11 +30,11 @@ extern "C" { #endif -typedef struct SSnode { +struct SSnode { char* path; SStreamMeta* pMeta; SMsgCb msgCb; -} SSnode; +}; #if 0 typedef struct { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 0f7f74f78b..2fa9f9a9ff 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -807,6 +807,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { STaskStartInfo* pStartInfo = &pMeta->startInfo; int32_t vgId = pMeta->vgId; + bool scanWal = false; streamMetaWLock(pMeta); if (pStartInfo->taskStarting == 1) { @@ -831,10 +832,18 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { pStartInfo->restartCount = 0; tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId); } + + scanWal = true; } } streamMetaWUnLock(pMeta); + + if (scanWal && (vgId != SNODE_HANDLE)) { + tqDebug("vgId:%d start scan wal for executing tasks", vgId); + tqScanWalAsync(pMeta->ahandle, true); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index aae3594905..8d5e4f3c87 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -304,7 +304,6 @@ void streamMetaRemoveDB(void* arg, char* key) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage, startComplete_fn_t fn) { - int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -516,7 +515,6 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskDbUnique); - taosHashCleanup(pMeta->pUpdateTaskSet); taosHashCleanup(pMeta->updateInfo.pTasks); taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pFailedTaskSet); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 0161f382ba..f2a694a554 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -398,8 +398,7 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { if (pTask->status.taskStatus == TASK_STATUS__HALT) { ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); - // halt it self for count window stream task until the related - // fill history task completd. + // halt it self for count window stream task until the related fill history task completed. stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);