fix(stream): check for nodeupdate trans before create streams.

This commit is contained in:
Haojun Liao 2025-01-21 18:41:52 +08:00
parent 3206c688ed
commit 88a94919c4
3 changed files with 26 additions and 2 deletions

View File

@ -159,6 +159,7 @@ void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList,
SVgroupChangeInfo *pInfo);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
bool isNodeUpdateTransActive();
int32_t createStreamTaskIter(SStreamObj *pStream, SStreamTaskIter **pIter);
void destroyStreamTaskIter(SStreamTaskIter *pIter);

View File

@ -65,8 +65,6 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq);
static void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo);
static void removeExpiredNodeInfo(const SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
@ -809,6 +807,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
// check for the taskEp update trans
if (isNodeUpdateTransActive()) {
mError("stream:%s failed to create stream, node update trans is active", createReq.name);
code = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
goto _OVER;
}
code = mndCheckForSnode(pMnode, pSourceDb);
mndReleaseDb(pMnode, pSourceDb);
if (code != 0) {

View File

@ -292,6 +292,24 @@ int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msg
return mndTransAppendRedoAction(pTrans, &action);
}
bool isNodeUpdateTransActive() {
bool exist = false;
void *pIter = NULL;
streamMutexLock(&execInfo.lock);
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
SStreamTransInfo *pTransInfo = (SStreamTransInfo *)pIter;
if (strcmp(pTransInfo->name, MND_STREAM_CHKPT_UPDATE_NAME) != 0) {
mDebug("stream:0x%" PRIx64 " %s st:%" PRId64 " is in task nodeEp update, create new stream not allowed",
pTransInfo->streamId, pTransInfo->name, pTransInfo->startTime) exist = true;
}
}
streamMutexUnlock(&execInfo.lock);
return exist;
}
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
void *pIter = NULL;