From 5ab739ae466e8fdbbe1d2f0598e6eaf45b63b4a2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 11 Sep 2023 15:29:12 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- source/dnode/vnode/src/tq/tqSink.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index c84ab831bd..a5a685580d 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -24,12 +24,12 @@ typedef struct STableSinkInfo { tstr name; } STableSinkInfo; +static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, + SSubmitTbData* pTableData); static int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock, SSubmitTbData* pTableData); static int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* pDataBlock, SStreamTask* pTask, int64_t suid); -static int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, - SSubmitTbData* pTableData); static int32_t tqBuildSubmitReq(SSubmitReq2* pSubmitReq, int32_t vgId, void** pMsg, int32_t* msgLen); int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, @@ -303,6 +303,7 @@ static int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* if (pNewRow->ts == pOldRow->ts) { k += 1; + tRowDestroy(pOldRow); } } else { taosArrayPush(pFinal, &pOldRow); @@ -803,6 +804,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat pTableData->pCreateTbReq = buildAutoCreateTableReq(stbFullName, suid, pTSchema->numOfCols + 1, pDataBlock); if (pTableData->pCreateTbReq == NULL) { tqError("s-task:%s failed to build auto create table req, code:%s", id, tstrerror(terrno)); + taosMemoryFree(pTableSinkInfo); return terrno; } @@ -827,7 +829,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlock* pDataBlock, - SSubmitTbData* pTableData) { + SSubmitTbData* pTableData) { int32_t numOfRows = pDataBlock->info.rows; const char* id = pTask->id.idStr;