fix(stream): fix memory leak.
This commit is contained in:
parent
55b06746de
commit
7f04e2cfb9
|
@ -1847,6 +1847,7 @@ typedef struct SMStreamDropOrphanMsg {
|
|||
|
||||
int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg);
|
||||
int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg);
|
||||
void tDestroyDropOrphanTaskMsg(SMStreamDropOrphanMsg* pMsg);
|
||||
|
||||
typedef struct {
|
||||
int32_t id;
|
||||
|
|
|
@ -5360,6 +5360,14 @@ int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrp
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tDestroyDropOrphanTaskMsg(SMStreamDropOrphanMsg *pMsg) {
|
||||
if (pMsg == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pMsg->pList);
|
||||
}
|
||||
|
||||
int32_t tEncodeSReplica(SEncoder *pEncoder, SReplica *pReplica) {
|
||||
if (tEncodeI32(pEncoder, pReplica->id) < 0) return -1;
|
||||
if (tEncodeU16(pEncoder, pReplica->port) < 0) return -1;
|
||||
|
|
|
@ -2749,6 +2749,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
|
|||
int32_t code = 0;
|
||||
SOrphanTask *pTask = NULL;
|
||||
int32_t i = 0;
|
||||
STrans *pTrans = NULL;
|
||||
int32_t numOfTasks = 0;
|
||||
|
||||
SMStreamDropOrphanMsg msg = {0};
|
||||
code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
|
||||
|
@ -2756,10 +2758,10 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(msg.pList);
|
||||
numOfTasks = taosArrayGetSize(msg.pList);
|
||||
if (numOfTasks == 0) {
|
||||
mDebug("no orphan tasks to drop, no need to create trans");
|
||||
return code;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
mDebug("create trans to drop %d orphan tasks", numOfTasks);
|
||||
|
@ -2771,52 +2773,52 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
|
|||
|
||||
if (pTask == NULL) {
|
||||
mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
|
||||
if (conflict) {
|
||||
return -1;
|
||||
code = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
|
||||
if (pTrans == NULL || code != 0) {
|
||||
mError("failed to create trans to drop orphan tasks since %s", terrstr());
|
||||
return code;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
|
||||
if (code) {
|
||||
return code;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// drop all tasks
|
||||
if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
|
||||
mError("failed to create trans to drop orphan tasks since %s", terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// drop stream
|
||||
if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
_err:
|
||||
tDestroyDropOrphanTaskMsg(&msg);
|
||||
mndTransDrop(pTrans);
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mDebug("create drop %d orphan tasks trans succ", numOfTasks);
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
}
|
Loading…
Reference in New Issue