fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-11-02 08:11:51 +08:00
parent e1b3e00b3d
commit dc8a90c864
2 changed files with 15 additions and 17 deletions

View File

@ -41,9 +41,9 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
int32_t numOfBlocks, int64_t dstTaskId, int32_t type); int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->msgType = msgType; pMsg->msgType = msgType;
pMsg->pCont = pCont; pMsg->pCont = pCont;
pMsg->contLen = contLen; pMsg->contLen = contLen;
} }
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {

View File

@ -844,6 +844,12 @@ static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
return false; return false;
} }
static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) {
taosArrayDestroy(pMsg->pTaskStatus);
taosArrayDestroy(pMsg->pUpdateNodes);
taosArrayDestroy(pIdList);
}
void metaHbToMnode(void* param, void* tmrId) { void metaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param; int64_t rid = *(int64_t*)param;
@ -977,17 +983,13 @@ void metaHbToMnode(void* param, void* tmrId) {
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code); tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
if (code < 0) { if (code < 0) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus); goto _end;
taosReleaseRef(streamMetaId, rid);
return;
} }
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosArrayDestroy(hbMsg.pTaskStatus); goto _end;
taosReleaseRef(streamMetaId, rid);
return;
} }
SEncoder encoder; SEncoder encoder;
@ -995,15 +997,12 @@ void metaHbToMnode(void* param, void* tmrId) {
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf); rpcFreeCont(buf);
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
taosArrayDestroy(hbMsg.pTaskStatus); goto _end;
taosReleaseRef(streamMetaId, rid);
return;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg msg = {0}; SRpcMsg msg = {.info.noResp = 1,};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen); initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
msg.info.noResp = 1;
pMeta->pHbInfo->hbCount += 1; pMeta->pHbInfo->hbCount += 1;
@ -1014,9 +1013,8 @@ void metaHbToMnode(void* param, void* tmrId) {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId); stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
} }
taosArrayDestroy(hbMsg.pTaskStatus); _end:
taosArrayDestroy(hbMsg.pUpdateNodes); clearHbMsg(&hbMsg, pIdList);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
} }