From c016a58623286e837583882d057d879b38367dc9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Aug 2023 19:22:59 +0800 Subject: [PATCH] enh(stream): do some internal refactor. --- source/dnode/mnode/impl/src/mndStream.c | 31 ++++--------------------- 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index dfbc6e92be..8f10c00c9c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -41,7 +41,7 @@ typedef struct SNodeEntry { } SNodeEntry; typedef struct SStreamVnodeRevertIndex { - SHashObj* pVnodeMap; +// SHashObj* pVnodeMap; SArray* pNodeEntryList; } SStreamVnodeRevertIndex; @@ -1787,7 +1787,7 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3 mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); ASSERT(0); - // mndTransSetDbName(pTrans, "stream-task-update", "checkpoint"); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to build stream:0x%" PRIx64 " task DAG update, code:%s", pStream->uid, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); @@ -1938,31 +1938,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // update the related upstream and downstream tasks taosWLockLatch(&pStream->lock); - int32_t numOfLevels = taosArrayGetSize(pStream->tasks); - - for (int32_t j = 0; j < numOfLevels; ++j) { - SArray *pLevel = taosArrayGetP(pStream->tasks, j); - - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask *pTask = taosArrayGetP(pLevel, k); - if (pTask->info.nodeId == nodeId) { - pTask->info.epSet = newEpSet; - continue; - } - - // check for the dispath info and the upstream task info - int32_t level = pTask->info.taskLevel; - if (level == TASK_LEVEL__SOURCE) { - streamTaskUpdateDownstreamInfo(pTask, nodeId, &newEpSet); - } else if (level == TASK_LEVEL__AGG) { - streamTaskUpdateUpstreamInfo(pTask, nodeId, &newEpSet); - streamTaskUpdateDownstreamInfo(pTask, nodeId, &newEpSet); - } else { // TASK_LEVEL__SINK - streamTaskUpdateUpstreamInfo(pTask, nodeId, &newEpSet); - } - } - } + updateTaskEpInfo(pStream, req.vgId, &req.epset); + // write down code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet); if (code != TSDB_CODE_SUCCESS) {