diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 7b71ad873b..58a4c92d3e 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -72,7 +72,7 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid); int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId); bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock); -int32_t mndStreamGetRelCheckpointTrans(SMnode *pMnode, int64_t streamUid); +int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid); // for sma // TODO refactor diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 95150b9d6e..a69543a6d6 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -81,7 +81,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); -static void killCheckpointTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); +static void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static void freeCheckpointCandEntry(void *); @@ -95,9 +95,6 @@ static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream); static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream); static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream); -static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); -static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); - int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STREAM, @@ -1455,9 +1452,10 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } // kill the related checkpoint trans - int32_t transId = mndStreamGetRelCheckpointTrans(pMnode, pStream->uid); + int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid); if (transId != 0) { - killCheckpointTransImpl(pMnode, transId, pStream->sourceDb); + mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name); + killTransImpl(pMnode, transId, pStream->sourceDb); } removeStreamTasksInBuf(pStream, &execInfo); @@ -1502,9 +1500,10 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { #endif // kill the related checkpoint trans - int32_t transId = mndStreamGetRelCheckpointTrans(pMnode, pStream->uid); + int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid); if (transId != 0) { - killCheckpointTransImpl(pMnode, transId, pStream->sourceDb); + mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name); + killTransImpl(pMnode, transId, pStream->sourceDb); } // drop the stream obj in execInfo @@ -2836,10 +2835,10 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { return TSDB_CODE_ACTION_IN_PROGRESS; } -void killCheckpointTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) { +void killTransImpl(SMnode* pMnode, int32_t transId, const char* pDbName) { STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { - mInfo("kill checkpoint transId:%d in Db:%s", transId, pDbName); + mInfo("kill active transId:%d in Db:%s", transId, pDbName); mndKillTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); } @@ -2859,7 +2858,7 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { } char* pDupDBName = strndup(pDBName, len); - killCheckpointTransImpl(pMnode, pTransInfo->transId, pDupDBName); + killTransImpl(pMnode, pTransInfo->transId, pDupDBName); taosMemoryFree(pDupDBName); return TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index f5047acc49..bfea4349b0 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -113,7 +113,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* return false; } -int32_t mndStreamGetRelCheckpointTrans(SMnode* pMnode, int64_t streamUid) { +int32_t mndStreamGetRelTrans(SMnode* pMnode, int64_t streamUid) { taosThreadMutexLock(&execInfo.lock); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { @@ -127,7 +127,7 @@ int32_t mndStreamGetRelCheckpointTrans(SMnode* pMnode, int64_t streamUid) { SStreamTransInfo tInfo = *pEntry; taosThreadMutexUnlock(&execInfo.lock); - if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { + if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0 || strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { return tInfo.transId; } } else {