fix(stream):fix stream memory leak
This commit is contained in:
parent
d94f3c5c41
commit
dd1ab39eb0
|
@ -37,7 +37,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) {
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||
|
@ -67,7 +67,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
|
||||
|
@ -79,7 +79,7 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
|
||||
|
||||
|
@ -132,7 +132,7 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||
|
@ -160,7 +160,7 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
|
||||
|
@ -190,7 +190,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
|
||||
|
@ -216,7 +216,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->stage) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
|
||||
|
@ -302,7 +302,7 @@ void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) {
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->dstNodeId) < 0) return -1;
|
||||
|
@ -332,7 +332,7 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
|
|||
void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
|
||||
|
||||
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
||||
|
@ -350,7 +350,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
|
||||
|
||||
|
@ -508,7 +508,7 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
|
||||
|
@ -667,7 +667,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
||||
|
@ -695,7 +695,7 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq
|
|||
}
|
||||
|
||||
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
|
||||
if (tStartEncode(pEncoder) != 0) return -1;
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
|
||||
|
|
Loading…
Reference in New Issue