fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-09-14 22:59:11 +08:00
parent 39e0c57323
commit 5f1146d415
2 changed files with 7 additions and 0 deletions

View File

@ -1886,6 +1886,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
@ -1894,6 +1895,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
@ -1911,6 +1913,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}

View File

@ -1692,6 +1692,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
taosWUnLockLatch(&pMeta->lock);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
@ -1764,12 +1765,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
taosWUnLockLatch(&pMeta->lock);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
taosWUnLockLatch(&pMeta->lock);
taosArrayDestroy(req.pNodeList);
return -1;
}
@ -1786,5 +1789,6 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
taosArrayDestroy(req.pNodeList);
return rsp.code;
}