From 1f9a58361dac698e991a5d50f51ea5b56ac9c15a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 13:36:35 +0800 Subject: [PATCH 1/5] fix(stream): set the correct updated nodeId. --- source/libs/stream/src/streamMeta.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e6bbd89f02..042ff1d1d8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -952,19 +952,20 @@ void metaHbToMnode(void* param, void* tmrId) { taosThreadMutexLock(&(*pTask)->lock); int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList); for (int j = 0; j < num; ++j) { - int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); + SDownstreamTaskEpset* pTaskEpset = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); bool exist = false; int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes); for (int k = 0; k < numOfExisted; ++k) { - if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { + if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { exist = true; break; } } if (!exist) { - taosArrayPush(hbMsg.pUpdateNodes, pNodeId); + taosArrayPush(hbMsg.pUpdateNodes, &pTaskEpset->nodeId); + stDebug("vgId:%d nodeId:%d added into the update list", pMeta->vgId, pTaskEpset->nodeId); } } From 0b36081158df866e16f7b3a7715333c693e94bdc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 14:36:18 +0800 Subject: [PATCH 2/5] refactor:add some logs. --- include/libs/stream/tstream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 1 + source/libs/stream/src/streamMeta.c | 3 ++- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9d32912ece..4b760f3f4e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -461,7 +461,7 @@ typedef struct STaskStartInfo { int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed - int32_t elapsedTime; + int64_t elapsedTime; } STaskStartInfo; typedef struct STaskUpdateInfo { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e26b8e36a3..18a276dea9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2670,6 +2670,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } + mDebug("%d stream nodes needs updated", (int32_t) taosArrayGetSize(req.pUpdateNodes)); setNodeEpsetExpiredFlag(req.pUpdateNodes); for (int32_t i = 0; i < req.numOfTasks; ++i) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 042ff1d1d8..fe157aaa24 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -965,7 +965,8 @@ void metaHbToMnode(void* param, void* tmrId) { if (!exist) { taosArrayPush(hbMsg.pUpdateNodes, &pTaskEpset->nodeId); - stDebug("vgId:%d nodeId:%d added into the update list", pMeta->vgId, pTaskEpset->nodeId); + stDebug("vgId:%d nodeId:%d added into the update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, + (int32_t)taosArrayGetSize(hbMsg.pUpdateNodes)); } } From d0307e58768193d15c49256cd4fb16c012dd3b13 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 15:30:21 +0800 Subject: [PATCH 3/5] refactor: --- source/dnode/mnode/impl/src/mndStream.c | 11 +++-- source/libs/stream/src/streamMeta.c | 56 ++++++++++++++----------- source/libs/stream/src/streamStart.c | 2 +- 3 files changed, 41 insertions(+), 28 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 18a276dea9..143c061d0f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2610,16 +2610,18 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) { int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { int32_t num = taosArrayGetSize(pNodeList); + mInfo("set node expired for %d nodes", num); for (int k = 0; k < num; ++k) { int32_t* pVgId = taosArrayGet(pNodeList, k); + mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); for (int i = 0; i < numOfNodes; ++i) { SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); if (pNodeEntry->nodeId == *pVgId) { - mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId); + mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); pNodeEntry->stageUpdated = true; break; } @@ -2670,8 +2672,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } - mDebug("%d stream nodes needs updated", (int32_t) taosArrayGetSize(req.pUpdateNodes)); - setNodeEpsetExpiredFlag(req.pUpdateNodes); + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); + if (numOfUpdated > 0) { + mDebug("%d stream nodes needs updated from tasks' report", (int32_t)taosArrayGetSize(req.pUpdateNodes)); + setNodeEpsetExpiredFlag(req.pUpdateNodes); + } for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index fe157aaa24..dfe5729b29 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -854,6 +854,37 @@ static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) { taosArrayDestroy(pIdList); } +static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) { + int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes); + for (int k = 0; k < numOfExisted; ++k) { + if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) { + return true; + } + } + return false; +} + +static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) { + SStreamMeta* pMeta = pTask->pMeta; + + taosThreadMutexLock(&pTask->lock); + + int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList); + for (int j = 0; j < num; ++j) { + SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j); + + bool exist = existInHbMsg(pMsg, pTaskEpset); + if (!exist) { + taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId); + stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, + (int32_t)taosArrayGetSize(pMsg->pUpdateNodes)); + } + } + + taosArrayClear(pTask->outputInfo.pDownstreamUpdateList); + taosThreadMutexUnlock(&pTask->lock); +} + void metaHbToMnode(void* param, void* tmrId) { int64_t rid = *(int64_t*)param; @@ -949,30 +980,7 @@ void metaHbToMnode(void* param, void* tmrId) { walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); } - taosThreadMutexLock(&(*pTask)->lock); - int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList); - for (int j = 0; j < num; ++j) { - SDownstreamTaskEpset* pTaskEpset = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j); - - bool exist = false; - int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes); - for (int k = 0; k < numOfExisted; ++k) { - if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) { - exist = true; - break; - } - } - - if (!exist) { - taosArrayPush(hbMsg.pUpdateNodes, &pTaskEpset->nodeId); - stDebug("vgId:%d nodeId:%d added into the update list, total:%d", pMeta->vgId, pTaskEpset->nodeId, - (int32_t)taosArrayGetSize(hbMsg.pUpdateNodes)); - } - } - - taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList); - taosThreadMutexUnlock(&(*pTask)->lock); - + addUpdateNodeIntoHbMsg(*pTask, &hbMsg); taosArrayPush(hbMsg.pTaskStatus, &entry); if (!hasMnodeEpset) { epsetAssign(&epset, &(*pTask)->info.mnodeEpset); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 32d6294de8..97eb7b79a2 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -1083,7 +1083,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) { - pStartInfo->readyTs = pTask->execInfo.start; + pStartInfo->readyTs = taosGetTimestampMs(); pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0; stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64 From 5cc6cd0bfccb7492830717083f71966f77f77c01 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 15:39:55 +0800 Subject: [PATCH 4/5] refactor: update the node change check duration. --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 9adabdc8e2..cefbb4011e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -247,7 +247,7 @@ int32_t tsTransPullupInterval = 2; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; -int32_t tsStreamNodeCheckInterval = 30; +int32_t tsStreamNodeCheckInterval = 15; int32_t tsTtlUnit = 86400; int32_t tsTtlPushIntervalSec = 10; int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups From 08a9244230b2dabf82f530bb150ea61ce68b8a8a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 10 Nov 2023 16:24:20 +0800 Subject: [PATCH 5/5] fix(stream): extact stream nodes list if not initialized. --- source/dnode/mnode/impl/src/mndStream.c | 85 ++++++++++++++----------- source/libs/stream/src/streamMeta.c | 2 +- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 143c061d0f..465bb62f77 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -43,7 +43,7 @@ typedef struct SNodeEntry { } SNodeEntry; typedef struct SStreamExecInfo { - SArray *pNodeEntryList; + SArray *pNodeList; int64_t ts; // snapshot ts int64_t activeCheckpoint; // active check point id SHashObj *pTaskMap; @@ -850,7 +850,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); taosThreadMutexLock(&execInfo.lock); - mDebug("register to stream task node list"); + mDebug("stream tasks register into node list"); keepStreamTasksInBuf(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); @@ -1125,6 +1125,15 @@ static const char *mndGetStreamDB(SMnode *pMnode) { return p; } +static int32_t initStreamNodeList(SMnode* pMnode) { + if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) { + execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); + execInfo.pNodeList = extractNodeListFromStream(pMnode); + } + + return taosArrayGetSize(execInfo.pNodeList); +} + static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1135,22 +1144,18 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { { // check if the node update happens or not int64_t ts = taosGetTimestampSec(); - if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) { - if (execInfo.pNodeEntryList != NULL) { - execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); - } + taosThreadMutexLock(&execInfo.lock); + int32_t numOfNodes = initStreamNodeList(pMnode); + taosThreadMutexUnlock(&execInfo.lock); - execInfo.pNodeEntryList = extractNodeListFromStream(pMnode); - } - - if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) { + if (numOfNodes == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); execInfo.ts = ts; return 0; } - for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); + for(int32_t i = 0; i < numOfNodes; ++i) { + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i); if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); return 0; @@ -1165,7 +1170,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { return 0; } - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); @@ -2080,20 +2085,21 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) { break; } - SNodeEntry entry = {0}; + SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime}; entry.epset = mndGetVgroupEpset(pMnode, pVgroup); - entry.nodeId = pVgroup->vgId; - entry.hbTimestamp = pVgroup->updateTime; + // if not all ready till now, no need to check the remaining vgroups. if (*allReady) { for (int32_t i = 0; i < pVgroup->replica; ++i) { if (!pVgroup->vnodeGid[i].syncRestore) { + mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId); *allReady = false; break; } ESyncState state = pVgroup->vnodeGid[i].syncState; if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) { + mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId); *allReady = false; break; } @@ -2300,8 +2306,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { int32_t size = taosArrayGetSize(pNodeSnapshot); SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); - for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) { - SNodeEntry* p = taosArrayGet(execInfo.pNodeEntryList, i); + for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) { + SNodeEntry* p = taosArrayGet(execInfo.pNodeList, i); for(int32_t j = 0; j < size; ++j) { SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j); @@ -2312,8 +2318,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { } } - execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); - execInfo.pNodeEntryList = pValidNodeEntryList; + taosArrayDestroy(execInfo.pNodeList); + execInfo.pNodeList = pValidNodeEntryList; mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList)); taosArrayDestroy(pRemovedTasks); @@ -2323,6 +2329,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; + int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); if (old != 0) { mDebug("still in checking node change"); @@ -2333,23 +2340,21 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int64_t ts = taosGetTimestampSec(); SMnode *pMnode = pMsg->info.node; - if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) { - if (execInfo.pNodeEntryList != NULL) { - execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); - } - execInfo.pNodeEntryList = extractNodeListFromStream(pMnode); - } - if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) { + taosThreadMutexLock(&execInfo.lock); + int32_t numOfNodes = initStreamNodeList(pMnode); + taosThreadMutexUnlock(&execInfo.lock); + + if (numOfNodes == 0) { mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); execInfo.ts = ts; atomic_store_32(&mndNodeCheckSentinel, 0); return 0; } - bool allVnodeReady = true; - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady); - if (!allVnodeReady) { + bool allVgroupsReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVgroupsReady); + if (!allVgroupsReady) { taosArrayDestroy(pNodeSnapshot); atomic_store_32(&mndNodeCheckSentinel, 0); mWarn("not all vnodes are ready, ignore the exec nodeUpdate check"); @@ -2359,7 +2364,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { taosThreadMutexLock(&execInfo.lock); removeExpirednodeEntryAndTask(pNodeSnapshot); - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot); + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { // kill current active checkpoint transaction, since the transaction is vnode wide. @@ -2369,8 +2374,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { // keep the new vnode snapshot if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("create trans successfully, update cached node list"); - taosArrayDestroy(execInfo.pNodeEntryList); - execInfo.pNodeEntryList = pNodeSnapshot; + taosArrayDestroy(execInfo.pNodeList); + execInfo.pNodeList = pNodeSnapshot; execInfo.ts = ts; } else { mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code)); @@ -2616,9 +2621,9 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { int32_t* pVgId = taosArrayGet(pNodeList, k); mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); - int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int i = 0; i < numOfNodes; ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i); + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i); if (pNodeEntry->nodeId == *pVgId) { mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); @@ -2632,9 +2637,9 @@ int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) { } static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) { - int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList); + int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for(int32_t j = 0; j < numOfNodes; ++j) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j); + SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j); if (pNodeEntry->nodeId == pTaskEntry->nodeId) { mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, @@ -2667,14 +2672,18 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); taosThreadMutexLock(&execInfo.lock); + + // extract stream task list int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap); if (numOfExisted == 0) { doExtractTasksFromStream(pMnode); } + initStreamNodeList(pMnode); + int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { - mDebug("%d stream nodes needs updated from tasks' report", (int32_t)taosArrayGetSize(req.pUpdateNodes)); + mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId); setNodeEpsetExpiredFlag(req.pUpdateNodes); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index dfe5729b29..f364ed889d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1020,7 +1020,7 @@ void metaHbToMnode(void* param, void* tmrId) { pMeta->pHbInfo->hbCount += 1; - stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, + stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks, pMeta->pHbInfo->hbCount); tmsgSendReq(&epset, &msg); } else {