fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2024-09-18 16:43:18 +08:00
parent 586826c41f
commit 720d1c2486
1 changed files with 7 additions and 2 deletions

View File

@ -628,6 +628,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
code = tEncodeStreamTaskUpdateMsg(&encoder, &req); code = tEncodeStreamTaskUpdateMsg(&encoder, &req);
if (code == -1) { if (code == -1) {
tEncoderClear(&encoder); tEncoderClear(&encoder);
taosMemoryFree(buf);
taosArrayDestroy(req.pNodeList); taosArrayDestroy(req.pNodeList);
return code; return code;
} }
@ -648,21 +649,25 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) {
void *pBuf = NULL; void *pBuf = NULL;
int32_t len = 0; int32_t len = 0;
SEpSet epset = {0};
bool hasEpset = false;
bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
if (code) { if (code) {
mError("failed to build stream task epset update msg, code:%s", tstrerror(code));
return code; return code;
} }
SEpSet epset = {0};
bool hasEpset = false;
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) { if (code != TSDB_CODE_SUCCESS || !hasEpset) {
mError("failed to extract epset during create update epset, code:%s", tstrerror(code));
return code; return code;
} }
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mError("failed to create update task epset trans, code:%s", tstrerror(code));
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
} }