diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8fe55c2598..14bdb73b4f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2038,8 +2038,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP char buf[256] = {0}; EPSET_TO_STR(&pCurrent->epset, buf); - mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s", pCurrent->nodeId, pPrevEp->fqdn, - pPrevEp->port, buf); + mDebug("nodeId:%d restart/epset changed detected, old:%s:%d -> new:%s, stageUpdate:%d", pCurrent->nodeId, + pPrevEp->fqdn, pPrevEp->port, buf, pPrevEntry->stageUpdated); SNodeUpdateInfo updateInfo = {.nodeId = pPrevEntry->nodeId}; epsetAssign(&updateInfo.prevEp, &pPrevEntry->epset); @@ -2216,7 +2216,7 @@ static void doExtractTasksFromStream(SMnode *pMnode) { } } -static int32_t doRemoveFromTask(SStreamExecInfo* pExecNode, STaskId* pRemovedId) { +static int32_t doRemoveTasks(SStreamExecInfo* pExecNode, STaskId* pRemovedId) { void *p = taosHashGet(pExecNode->pTaskMap, pRemovedId, sizeof(*pRemovedId)); if (p != NULL) { @@ -2249,7 +2249,7 @@ static bool taskNodeExists(SArray* pList, int32_t nodeId) { } int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { - SArray* pRemoveTaskList = taosArrayInit(4, sizeof(STaskId)); + SArray* pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); for(int32_t i = 0; i < numOfTask; ++i) { @@ -2258,16 +2258,16 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); if (!existed) { - taosArrayPush(pRemoveTaskList, pId); + taosArrayPush(pRemovedTasks, pId); } } - for(int32_t i = 0; i < taosArrayGetSize(pRemoveTaskList); ++i) { - STaskId* pId = taosArrayGet(pRemoveTaskList, i); - doRemoveFromTask(&execInfo, pId); + for(int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) { + STaskId* pId = taosArrayGet(pRemovedTasks, i); + doRemoveTasks(&execInfo, pId); } - mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemoveTaskList), + mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), (int32_t) taosArrayGetSize(execInfo.pTaskList)); int32_t size = taosArrayGetSize(pNodeSnapshot); @@ -2287,7 +2287,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList); execInfo.pNodeEntryList = pValidNodeEntryList; - taosArrayDestroy(pRemoveTaskList); + mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList)); + taosArrayDestroy(pRemovedTasks); return 0; } @@ -2336,6 +2337,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { taosArrayDestroy(execInfo.pNodeEntryList); execInfo.pNodeEntryList = pNodeSnapshot; execInfo.ts = ts; + } else { + mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code)); + taosArrayDestroy(pNodeSnapshot); } } else { mDebug("no update found in nodeList");