From d0307e58768193d15c49256cd4fb16c012dd3b13 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 15:30:21 +0800 Subject: [PATCH] refactor: --- source/dnode/mnode/impl/src/mndStream.c | 11 +++-- source/libs/stream/src/streamMeta.c | 56 ++++++++++++++----------- source/libs/stream/src/streamStart.c | 2 +- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 18a276dea9..143c061d0f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2610,16 +2610,18 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { int32_t num = taosArrayGetSize(pNodeList); + mInfo("set node expired for %d nodes", num); for (int k = 0; k < num; ++k) { int32_t* pVgId = taosArrayGet(pNodeList, k); + mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); for (int i = 0; i < numOfNodes; ++i) { SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); if (pNodeEntry->nodeId == *pVgId) { - mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId); + mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); pNodeEntry->stageUpdated = true; break; } @@ -2670,8 +2672,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } - mDebug("%d stream nodes needs updated", (int32_t) taosArrayGetSize(req.pUpdateNodes)); - setNodeEpsetExpiredFlag(req.pUpdateNodes); + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); + if (numOfUpdated > 0) { + mDebug("%d stream nodes needs updated from tasks' report", (int32_t)taosArrayGetSize(req.pUpdateNodes)); + setNodeEpsetExpiredFlag(req.pUpdateNodes); + } for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fe157aaa24..dfe5729b29 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -854,6 +854,37 @@ static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) { taosArrayDestroy(pIdList); } +static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { + int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes); + for (int k = 0; k < numOfExisted; ++k) { + if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) { + return true; + } + } + return false; +} + +static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { + SStreamMeta* pMeta = pTask->pMeta; + + taosThreadMutexLock(&pTask->lock); + + int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + for (int j = 0; j < num; ++j) { + SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j); + + bool exist = existInHbMsg(pMsg, pTaskEpset); + if (!exist) { + taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId); + stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, + (int32_t)taosArrayGetSize(pMsg->pUpdateNodes)); + } + } + + taosArrayClear(pTask->outputInfo.pDownstreamUpdateList); + taosThreadMutexUnlock(&pTask->lock); +} + void metaHbToMnode(void* param, void* tmrId) { int64_t rid = *(int64_t*)param; @@ -949,30 +980,7 @@ void metaHbToMnode(void* param, void* tmrId) { walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); } - taosThreadMutexLock(&(*pTask)->lock); - int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList); - for (int j = 0; j < num; ++j) { - SDownstreamTaskEpset* pTaskEpset = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); - - bool exist = false; - int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes); - for (int k = 0; k < numOfExisted; ++k) { - if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { - exist = true; - break; - } - } - - if (!exist) { - taosArrayPush(hbMsg.pUpdateNodes, &pTaskEpset->nodeId); - stDebug("vgId:%d nodeId:%d added into the update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, - (int32_t)taosArrayGetSize(hbMsg.pUpdateNodes)); - } - } - - taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList); - taosThreadMutexUnlock(&(*pTask)->lock); - + addUpdateNodeIntoHbMsg(*pTask, &hbMsg); taosArrayPush(hbMsg.pTaskStatus, &entry); if (!hasMnodeEpset) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 32d6294de8..97eb7b79a2 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1083,7 +1083,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { - pStartInfo->readyTs = pTask->execInfo.start; + pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64