From 29648be30ded486ccba66b18df204220a48fcaca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 4 Jun 2024 23:28:08 +0800 Subject: [PATCH] fix(stream): add the new node info when adding stream tasks. --- source/dnode/mnode/impl/inc/mndStream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 89 ++++++++++++--------- source/dnode/mnode/impl/src/mndStreamHb.c | 11 ++- source/dnode/mnode/impl/src/mndStreamUtil.c | 24 ++++++ 4 files changed, 83 insertions(+), 42 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 2800aecdfa..6d2a89ddc9 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -120,6 +120,7 @@ void destroyStreamTaskIter(SStreamTaskIter *pIter); bool streamTaskIterNextTask(SStreamTaskIter *pIter); SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); void mndInitExecInfo(); +void removeExpiredNodeInfo(const SArray *pNodeSnapshot); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e108ba557a..bbf2ad63ce 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -62,7 +62,7 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); +static int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); @@ -692,6 +692,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SStreamObj streamObj = {0}; char *sql = NULL; int32_t sqlLen = 0; + const char* pMsg = "create stream tasks on dnodes"; + terrno = TSDB_CODE_SUCCESS; SCMCreateStreamReq createReq = {0}; @@ -704,8 +706,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { terrno = TSDB_CODE_MND_INVALID_PLATFORM; goto _OVER; #endif - mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql); + mInfo("stream:%s, start to create stream, sql:%s", createReq.name, createReq.sql); if (mndCheckCreateStreamReq(&createReq) != 0) { mError("stream:%s, failed to create since %s", createReq.name, terrstr()); goto _OVER; @@ -745,8 +747,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - STrans *pTrans = - doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes"); + STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, pMsg); if (pTrans == NULL) { goto _OVER; } @@ -789,7 +790,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { // add into buffer firstly // to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already. taosThreadMutexLock(&execInfo.lock); - mDebug("stream stream:%s start to register tasks into task_node_list", createReq.name); + mDebug("stream stream:%s start to register tasks into task nodeList", createReq.name); saveStreamTasksInfo(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); @@ -1030,7 +1031,7 @@ _ERR: } int32_t initStreamNodeList(SMnode *pMnode) { - if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) { + if (taosArrayGetSize(execInfo.pNodeList) == 0) { execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList); execInfo.pNodeList = extractNodeListFromStream(pMnode); } @@ -2203,8 +2204,8 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { epsetToStr(&pEntry->epset, buf, tListLen(buf)); mDebug("extract nodeInfo from stream obj, nodeId:%d, %s", pEntry->nodeId, buf); } - taosHashCleanup(pHash); + taosHashCleanup(pHash); return plist; } @@ -2242,15 +2243,17 @@ static bool taskNodeExists(SArray *pList, int32_t nodeId) { return false; } -int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { +int32_t removeExpiredNodeEntryAndTask(SArray *pNodeSnapshot) { SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId)); int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList); for (int32_t i = 0; i < numOfTask; ++i) { - STaskId *pId = taosArrayGet(execInfo.pTaskList, i); - STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + STaskId *pId = taosArrayGet(execInfo.pTaskList, i); - if (pEntry->nodeId == SNODE_HANDLE) continue; + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId)); + if (pEntry->nodeId == SNODE_HANDLE) { + continue; + } bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId); if (!existed) { @@ -2266,24 +2269,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks), (int32_t)taosArrayGetSize(execInfo.pTaskList)); - int32_t size = taosArrayGetSize(pNodeSnapshot); - SArray *pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry)); - for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) { - SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i); + removeExpiredNodeInfo(pNodeSnapshot); - for (int32_t j = 0; j < size; ++j) { - SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j); - if (pEntry->nodeId == p->nodeId) { - taosArrayPush(pValidNodeEntryList, p); - break; - } - } - } - - taosArrayDestroy(execInfo.pNodeList); - execInfo.pNodeList = pValidNodeEntryList; - - mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList)); taosArrayDestroy(pRemovedTasks); return 0; } @@ -2314,9 +2301,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - bool allVgroupsReady = true; - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVgroupsReady); - if (!allVgroupsReady) { + bool allReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + if (!allReady) { taosArrayDestroy(pNodeSnapshot); atomic_store_32(&mndNodeCheckSentinel, 0); mWarn("not all vnodes are ready, ignore the exec nodeUpdate check"); @@ -2324,31 +2311,30 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } taosThreadMutexLock(&execInfo.lock); - removeExpirednodeEntryAndTask(pNodeSnapshot); + removeExpiredNodeEntryAndTask(pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { // kill current active checkpoint transaction, since the transaction is vnode wide. killAllCheckpointTrans(pMnode, &changeInfo); - code = mndProcessVgroupChange(pMnode, &changeInfo); // keep the new vnode snapshot if success if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { - mDebug("create trans successfully, update cached node list"); taosArrayDestroy(execInfo.pNodeList); - execInfo.pNodeList = pNodeSnapshot; + execInfo.pNodeList = extractNodeListFromStream(pMnode); execInfo.ts = ts; + mDebug("create trans successfully, update cached node list, numOfNodes:%d", taosArrayGetSize(execInfo.pNodeList)); } else { mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code)); - taosArrayDestroy(pNodeSnapshot); } } else { mDebug("no update found in nodeList"); - taosArrayDestroy(pNodeSnapshot); } + taosArrayDestroy(pNodeSnapshot); taosThreadMutexUnlock(&execInfo.lock); + taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); @@ -2385,8 +2371,27 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); taosArrayPush(pExecNode->pTaskList, &id); - mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, - (int32_t)taosArrayGetSize(pExecNode->pTaskList)); + + int32_t num = (int32_t)taosArrayGetSize(pExecNode->pTaskList); + mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, num); + + // add the new vgroups if not added yet + bool exist = false; + for(int32_t j = 0; j < taosArrayGetSize(pExecNode->pNodeList); ++j) { + SNodeEntry* pEntry = taosArrayGet(pExecNode->pNodeList, j); + if (pEntry->nodeId == pTask->info.nodeId) { + exist = true; + break; + } + } + + if (!exist) { + SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; + epsetAssign(&nodeEntry.epset, &pTask->info.epSet); + + taosArrayPush(pExecNode->pNodeList, &nodeEntry); + mInfo("vgId:%d added into nodeList, total:%d", nodeEntry.nodeId, (int)taosArrayGetSize(pExecNode->pNodeList)); + } } } @@ -2394,6 +2399,8 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { + taosThreadMutexLock(&pExecNode->lock); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); while (streamTaskIterNextTask(pIter)) { SStreamTask *pTask = streamTaskIterGetCurrent(pIter); @@ -2416,8 +2423,10 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } } - destroyStreamTaskIter(pIter); ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); + taosThreadMutexUnlock(&pExecNode->lock); + + destroyStreamTaskIter(pIter); } static void doAddTaskId(SArray *pList, int32_t taskId, int64_t uid, int32_t numOfTotal) { diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 9bd7b3b18f..778fd295f7 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -131,18 +131,26 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { int32_t *pVgId = taosArrayGet(pNodeList, k); mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); + bool setFlag = false; int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); + for (int i = 0; i < numOfNodes; ++i) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); if (pNodeEntry->nodeId == *pVgId) { mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); pNodeEntry->stageUpdated = true; + setFlag = true; break; } } - } + if (!setFlag) { + mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist, update it", *pVgId); + ASSERT(0); + return TSDB_CODE_FAILED; + } + } return TSDB_CODE_SUCCESS; } @@ -361,7 +369,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { pHead->vgId = htonl(req.vgId); tmsgSendRsp(&rsp); - pReq->info.handle = NULL; // disable auto rsp } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index e53908eeed..54279161ab 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -135,6 +135,7 @@ SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { char buf[256] = {0}; epsetToStr(&entry.epset, buf, tListLen(buf)); mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); + taosArrayPush(pVgroupListSnapshot, &entry); sdbRelease(pSdb, pObj); } @@ -571,6 +572,29 @@ void mndInitExecInfo() { execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK); + execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry)); taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); } + +void removeExpiredNodeInfo(const SArray *pNodeSnapshot) { + SArray *pValidList = taosArrayInit(4, sizeof(SNodeEntry)); + int32_t size = taosArrayGetSize(pNodeSnapshot); + + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) { + SNodeEntry *p = taosArrayGet(execInfo.pNodeList, i); + + for (int32_t j = 0; j < size; ++j) { + SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j); + if (pEntry->nodeId == p->nodeId) { + taosArrayPush(pValidList, p); + break; + } + } + } + + taosArrayDestroy(execInfo.pNodeList); + execInfo.pNodeList = pValidList; + + mDebug("remain %d valid node entries after clean expired nodes info", (int32_t)taosArrayGetSize(pValidList)); +} \ No newline at end of file