fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-06-21 12:03:26 +08:00
parent fbed0acace
commit ea7b8e91d3
1 changed files with 2 additions and 0 deletions

View File

@ -1183,12 +1183,14 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req); int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("failed to find task:0x%x", req.taskId); tqError("failed to find task:0x%x", req.taskId);
return -1; return -1;
} }
// transfer the ownership of executor state // transfer the ownership of executor state
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId);