diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 0715556da2..8a4f6c2195 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -557,6 +557,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, mInfo("trans:%d, used to create db:%s", pTrans->id, pCreate->db); mndTransSetDbName(pTrans, dbObj.name, NULL); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; + mndTransSetOper(pTrans, MND_OPER_CREATE_DB); if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; @@ -776,7 +778,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p int32_t code = -1; mndTransSetDbName(pTrans, pOld->name, NULL); - if (mndTrancCheckConflict(pMnode, pTrans) != 0) return -1; + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER; @@ -1038,7 +1040,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { mInfo("trans:%d, used to drop db:%s", pTrans->id, pDb->name); mndTransSetDbName(pTrans, pDb->name, NULL); - + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER; if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index e6a6e6ae5b..2d1f6eb505 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -595,6 +595,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-sma"); if (pTrans == NULL) goto _OVER; mndTransSetDbName(pTrans, pDb->name, NULL); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; + mndTransSetSerial(pTrans); mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name); @@ -809,6 +811,8 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p mInfo("trans:%d, used to drop sma:%s", pTrans->id, pSma->name); mndTransSetDbName(pTrans, pDb->name, NULL); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; + mndTransSetSerial(pTrans); char streamName[TSDB_TABLE_FNAME_LEN] = {0}; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c611cfb6e1..54b3202d24 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -823,6 +823,7 @@ _OVER: int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { mndTransSetDbName(pTrans, pDb->name, pStb->name); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) return -1; if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1; @@ -1856,6 +1857,7 @@ static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbOb mInfo("trans:%d, used to alter stb:%s", pTrans->id, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (needRsp) { void *pCont = NULL; @@ -2055,6 +2057,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p mInfo("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); mndTransSetDbName(pTrans, pDb->name, pStb->name); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 05c0594339..83ed6eea78 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -640,9 +640,11 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); goto _OVER; } - mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); + mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; + // create stb for stream if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr()); @@ -790,6 +792,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint"); if (pTrans == NULL) return -1; mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + mndReleaseStream(pMnode, pStream); + mndTransDrop(pTrans); + return -1; + } + taosRLockLatch(&pStream->lock); // 1. redo action: broadcast checkpoint source msg for all source vg int32_t totLevel = taosArrayGetSize(pStream->tasks); @@ -882,11 +890,11 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { + sdbRelease(pMnode->pSdb, pStream); return -1; } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "drop-stream"); - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -894,6 +902,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); + mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 55e073a8a4..418474baa6 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -442,7 +442,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) return -1; + mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + mndTransDrop(pTrans); + return -1; + } // make txn: // 1. redo action: action to all vg diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e271eedd17..071edf992f 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -706,13 +706,19 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { #endif STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "drop-topic"); - mndTransSetDbName(pTrans, pTopic->db, NULL); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); mndReleaseTopic(pMnode, pTopic); return -1; } + mndTransSetDbName(pTrans, pTopic->db, NULL); + if (mndTrancCheckConflict(pMnode, pTrans) != 0) { + mndReleaseTopic(pMnode, pTopic); + mndTransDrop(pTrans); + return -1; + } + mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); // TODO check if rebalancing