diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b06adf3f2d..41e423ff6e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1847,6 +1847,7 @@ typedef struct SMStreamDropOrphanMsg { int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg); int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg); +void tDestroyDropOrphanTaskMsg(SMStreamDropOrphanMsg* pMsg); typedef struct { int32_t id; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 32915ba884..9529a75ba8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5360,6 +5360,14 @@ int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrp return 0; } +void tDestroyDropOrphanTaskMsg(SMStreamDropOrphanMsg *pMsg) { + if (pMsg == NULL) { + return; + } + + taosArrayDestroy(pMsg->pList); +} + int32_t tEncodeSReplica(SEncoder *pEncoder, SReplica *pReplica) { if (tEncodeI32(pEncoder, pReplica->id) < 0) return -1; if (tEncodeU16(pEncoder, pReplica->port) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b7ab76984a..2235b88c90 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2749,6 +2749,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { int32_t code = 0; SOrphanTask *pTask = NULL; int32_t i = 0; + STrans *pTrans = NULL; + int32_t numOfTasks = 0; SMStreamDropOrphanMsg msg = {0}; code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg); @@ -2756,10 +2758,10 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { return code; } - int32_t numOfTasks = taosArrayGetSize(msg.pList); + numOfTasks = taosArrayGetSize(msg.pList); if (numOfTasks == 0) { mDebug("no orphan tasks to drop, no need to create trans"); - return code; + goto _err; } mDebug("create trans to drop %d orphan tasks", numOfTasks); @@ -2771,52 +2773,52 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { if (pTask == NULL) { mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task"); - return TSDB_CODE_SUCCESS; + goto _err; } // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); if (conflict) { - return -1; + code = TSDB_CODE_MND_TRANS_CONFLICT; + goto _err; } SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; - STrans *pTrans = NULL; + code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); if (pTrans == NULL || code != 0) { mError("failed to create trans to drop orphan tasks since %s", terrstr()); - return code; + goto _err; } code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); if (code) { - return code; + goto _err; } // drop all tasks if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) { mError("failed to create trans to drop orphan tasks since %s", terrstr()); - mndTransDrop(pTrans); - return code; + goto _err; } // drop stream if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) { - mndTransDrop(pTrans); - return code; + goto _err; } code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return code; + goto _err; } +_err: + tDestroyDropOrphanTaskMsg(&msg); + mndTransDrop(pTrans); + if (code == TSDB_CODE_SUCCESS) { mDebug("create drop %d orphan tasks trans succ", numOfTasks); } - - mndTransDrop(pTrans); return code; } \ No newline at end of file