From af3a87560a431a0642c7cf1c48a34062d24b988e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Aug 2023 09:11:41 +0800 Subject: [PATCH] refactor: do some refactor. --- include/libs/stream/tstream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 32 ++----------------------- source/libs/stream/src/streamTask.c | 28 ++++++++++++++++++++++ 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3e1827cb97..4c886178df 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -623,6 +623,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage); int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir); +int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, int32_t taskId); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3826853a6b..bbff43371c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1845,35 +1845,6 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int3 return mndPersistTransLog(pStream, pTrans); } -static int32_t updateTaskEpInfo(SStreamObj* pStream, int32_t nodeId, SEpSet* pEpSet) { - int32_t numOfLevels = taosArrayGetSize(pStream->tasks); - - for (int32_t j = 0; j < numOfLevels; ++j) { - SArray *pLevel = taosArrayGetP(pStream->tasks, j); - - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask *pTask = taosArrayGetP(pLevel, k); - if (pTask->info.nodeId == nodeId) { - pTask->info.epSet = *pEpSet; - continue; - } - - // check for the dispath info and the upstream task info - int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SOURCE) { - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); - } else if (level == TASK_LEVEL__AGG) { - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); - streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); - } else { // TASK_LEVEL__SINK - streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); - } - } - } - return 0; -} - // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -1944,7 +1915,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // update the related upstream and downstream tasks, todo remove this, no need this function taosWLockLatch(&pStream->lock); - updateTaskEpInfo(pStream, req.vgId, &req.epset); + streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); + streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); taosWUnLockLatch(&pStream->lock); code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 6d2c33b7d4..f96f7e9b34 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -481,3 +481,31 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) { return 0; } +int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) { + int32_t numOfLevels = taosArrayGetSize(pTaskList); + + for (int32_t j = 0; j < numOfLevels; ++j) { + SArray *pLevel = taosArrayGetP(pTaskList, j); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask *pTask = taosArrayGetP(pLevel, k); + if (pTask->info.nodeId == nodeId) { + pTask->info.epSet = *pEpSet; + continue; + } + + // check for the dispath info and the upstream task info + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__SOURCE) { + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else if (level == TASK_LEVEL__AGG) { + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else { // TASK_LEVEL__SINK + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + } + } + } + return 0; +} \ No newline at end of file