Merge pull request #26938 from taosdata/fix/create_tb

fix(stream): fix memory leak.
This commit is contained in:
Haojun Liao 2024-08-02 12:55:17 +08:00 committed by GitHub
commit 9b81f536a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 26 additions and 15 deletions

View File

@ -1847,6 +1847,7 @@ typedef struct SMStreamDropOrphanMsg {
int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg); int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg);
int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg); int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg);
void tDestroyDropOrphanTaskMsg(SMStreamDropOrphanMsg* pMsg);
typedef struct { typedef struct {
int32_t id; int32_t id;

View File

@ -5360,6 +5360,14 @@ int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrp
return 0; return 0;
} }
void tDestroyDropOrphanTaskMsg(SMStreamDropOrphanMsg *pMsg) {
if (pMsg == NULL) {
return;
}
taosArrayDestroy(pMsg->pList);
}
int32_t tEncodeSReplica(SEncoder *pEncoder, SReplica *pReplica) { int32_t tEncodeSReplica(SEncoder *pEncoder, SReplica *pReplica) {
if (tEncodeI32(pEncoder, pReplica->id) < 0) return -1; if (tEncodeI32(pEncoder, pReplica->id) < 0) return -1;
if (tEncodeU16(pEncoder, pReplica->port) < 0) return -1; if (tEncodeU16(pEncoder, pReplica->port) < 0) return -1;

View File

@ -2800,6 +2800,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
int32_t code = 0; int32_t code = 0;
SOrphanTask *pTask = NULL; SOrphanTask *pTask = NULL;
int32_t i = 0; int32_t i = 0;
STrans *pTrans = NULL;
int32_t numOfTasks = 0;
SMStreamDropOrphanMsg msg = {0}; SMStreamDropOrphanMsg msg = {0};
code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg); code = tDeserializeDropOrphanTaskMsg(pReq->pCont, pReq->contLen, &msg);
@ -2807,10 +2809,10 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
return code; return code;
} }
int32_t numOfTasks = taosArrayGetSize(msg.pList); numOfTasks = taosArrayGetSize(msg.pList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
mDebug("no orphan tasks to drop, no need to create trans"); mDebug("no orphan tasks to drop, no need to create trans");
return code; goto _err;
} }
mDebug("create trans to drop %d orphan tasks", numOfTasks); mDebug("create trans to drop %d orphan tasks", numOfTasks);
@ -2822,52 +2824,52 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) {
if (pTask == NULL) { if (pTask == NULL) {
mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task"); mError("failed to extract entry in drop orphan task list, not create trans to drop orphan-task");
return TSDB_CODE_SUCCESS; goto _err;
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
if (conflict) { if (conflict) {
return -1; code = TSDB_CODE_MND_TRANS_CONFLICT;
goto _err;
} }
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
STrans *pTrans = NULL;
code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans); code = doCreateTrans(pMnode, &dummyObj, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream", &pTrans);
if (pTrans == NULL || code != 0) { if (pTrans == NULL || code != 0) {
mError("failed to create trans to drop orphan tasks since %s", terrstr()); mError("failed to create trans to drop orphan tasks since %s", terrstr());
return code; goto _err;
} }
code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
if (code) { if (code) {
return code; goto _err;
} }
// drop all tasks // drop all tasks
if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) { if ((code = mndStreamSetDropActionFromList(pMnode, pTrans, msg.pList)) < 0) {
mError("failed to create trans to drop orphan tasks since %s", terrstr()); mError("failed to create trans to drop orphan tasks since %s", terrstr());
mndTransDrop(pTrans); goto _err;
return code;
} }
// drop stream // drop stream
if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) { if ((code = mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED)) < 0) {
mndTransDrop(pTrans); goto _err;
return code;
} }
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); goto _err;
return code;
} }
_err:
tDestroyDropOrphanTaskMsg(&msg);
mndTransDrop(pTrans);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
mDebug("create drop %d orphan tasks trans succ", numOfTasks); mDebug("create drop %d orphan tasks trans succ", numOfTasks);
} }
mndTransDrop(pTrans);
return code; return code;
} }