From 88a94919c43bef731f607926e5bf68c8c953d4ec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 21 Jan 2025 18:41:52 +0800 Subject: [PATCH] fix(stream): check for nodeupdate trans before create streams. --- source/dnode/mnode/impl/inc/mndStream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 9 +++++++-- source/dnode/mnode/impl/src/mndStreamTrans.c | 18 ++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 857fd5c99c..fc1c95a3b3 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo); int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList, SVgroupChangeInfo *pInfo); void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); +bool isNodeUpdateTransActive(); int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter); void destroyStreamTaskIter(SStreamTaskIter *pIter); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4f75aae42e..c1ae5273c2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -65,8 +65,6 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq); static void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo); -static void removeExpiredNodeInfo(const SArray *pNodeSnapshot); -static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -809,6 +807,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } + // check for the taskEp update trans + if (isNodeUpdateTransActive()) { + mError("stream:%s failed to create stream, node update trans is active", createReq.name); + code = TSDB_CODE_STREAM_TASK_IVLD_STATUS; + goto _OVER; + } + code = mndCheckForSnode(pMnode, pSourceDb); mndReleaseDb(pMnode, pSourceDb); if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index a1e104aeca..9ef27f53b8 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -292,6 +292,24 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg return mndTransAppendRedoAction(pTrans, &action); } +bool isNodeUpdateTransActive() { + bool exist = false; + void *pIter = NULL; + + streamMutexLock(&execInfo.lock); + + while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { + SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter; + if (strcmp(pTransInfo->name, MND_STREAM_CHKPT_UPDATE_NAME) != 0) { + mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed", + pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime) exist = true; + } + } + + streamMutexUnlock(&execInfo.lock); + return exist; +} + int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { void *pIter = NULL;