fix(stream):fix stream memory leak

This commit is contained in:
54liuyao 2024-08-30 10:53:17 +08:00
parent a22025ea00
commit d94f3c5c41
7 changed files with 24 additions and 15 deletions

View File

@ -523,6 +523,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
int32_t code = tEncodeStreamTask(&encoder, pTask);
if (code == -1) {
tEncoderClear(&encoder);
return TSDB_CODE_INVALID_MSG;
}
@ -1009,6 +1010,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
tEncoderInit(&encoder, abuf, tlen);
int32_t pos = tEncodeStreamCheckpointSourceReq(&encoder, &req);
if (pos == -1) {
tEncoderClear(&encoder);
return TSDB_CODE_INVALID_MSG;
}

View File

@ -91,6 +91,7 @@ SRpcMsg buildHbReq() {
tEncoderInit(&encoder, (uint8_t*)buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &msg)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
goto _end;
}
tEncoderClear(&encoder);

View File

@ -214,6 +214,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamTaskCheckReq(&encoder, pReq)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
return code;
}
tEncoderClear(&encoder);
@ -830,6 +831,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
return code;
}
tEncoderClear(&encoder);
@ -1131,6 +1133,7 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamDispatchReq(&encoder, pReq)) < 0) {
tEncoderClear(&encoder);
goto FAIL;
}
tEncoderClear(&encoder);

View File

@ -95,6 +95,7 @@ static int32_t doSendHbMsgInfo(SStreamHbMsg* pMsg, SStreamMeta* pMeta, SEpSet* p
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, pMsg)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
return TSDB_CODE_FAILED;
}

View File

@ -37,7 +37,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
}
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
@ -67,7 +67,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
}
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
@ -79,7 +79,7 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp
}
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
@ -132,7 +132,7 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
@ -160,7 +160,7 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
}
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
@ -190,7 +190,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
}
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
@ -216,7 +216,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
}
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
@ -302,7 +302,7 @@ void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
}
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
@ -332,7 +332,7 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
@ -350,7 +350,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint
}
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
@ -508,7 +508,7 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
@ -667,7 +667,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
@ -695,7 +695,7 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq
}
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tStartEncode(pEncoder) != 0) return -1;
if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;

View File

@ -863,6 +863,7 @@ int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpoin
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskChkptReport(&encoder, &req)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
@ -1016,6 +1017,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
}

View File

@ -455,7 +455,7 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo,
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) {
if (tStartEncode(&encoder) != 0) {
code = TSDB_CODE_FAILED;
QUERY_CHECK_CODE(code, lino, _end);
}
@ -551,10 +551,10 @@ int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo,
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
*pLen = tlen;
_end:
tEncoderClear(&encoder);
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}