Merge pull request #27566 from taosdata/fix/TD-31802

fix(stream):fix stream memory leak
This commit is contained in:
Pan Wei 2024-09-02 09:33:52 +08:00 committed by GitHub
commit 645b264cad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 11 additions and 2 deletions

View File

@ -523,6 +523,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
int32_t code = tEncodeStreamTask(&encoder, pTask); int32_t code = tEncodeStreamTask(&encoder, pTask);
if (code == -1) { if (code == -1) {
tEncoderClear(&encoder);
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
@ -1009,6 +1010,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req); int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req);
if (pos == -1) { if (pos == -1) {
tEncoderClear(&encoder);
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }

View File

@ -91,6 +91,7 @@ SRpcMsg buildHbReq() {
tEncoderInit(&encoder, (uint8_t*)buf, tlen); tEncoderInit(&encoder, (uint8_t*)buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) { if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) {
rpcFreeCont(buf); rpcFreeCont(buf);
tEncoderClear(&encoder);
goto _end; goto _end;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);

View File

@ -214,6 +214,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamTaskCheckReq(&encoder, pReq)) < 0) { if ((code = tEncodeStreamTaskCheckReq(&encoder, pReq)) < 0) {
rpcFreeCont(buf); rpcFreeCont(buf);
tEncoderClear(&encoder);
return code; return code;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
@ -845,6 +846,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) { if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) {
rpcFreeCont(buf); rpcFreeCont(buf);
tEncoderClear(&encoder);
return code; return code;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
@ -1168,6 +1170,7 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) { if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
tEncoderClear(&encoder);
goto FAIL; goto FAIL;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);

View File

@ -95,6 +95,7 @@ static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* p
tEncoderInit(&encoder, buf, tlen); tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, pMsg)) < 0) { if ((code = tEncodeStreamHbMsg(&encoder, pMsg)) < 0) {
rpcFreeCont(buf); rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }

View File

@ -870,6 +870,7 @@ int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpoin
tEncoderInit(&encoder, buf, tlen); tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) { if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
rpcFreeCont(buf); rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code)); stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
return -1; return -1;
} }
@ -1021,6 +1022,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
tEncoderInit(&encoder, buf, tlen); tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) { if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
rpcFreeCont(buf); rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code)); stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
return -1; return -1;
} }

View File

@ -455,7 +455,7 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo,
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) { if (tStartEncode(&encoder) != 0) {
code = TSDB_CODE_FAILED; code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} }
@ -551,10 +551,10 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo,
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
*pLen = tlen; *pLen = tlen;
_end: _end:
tEncoderClear(&encoder);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
} }