From 5f23ba709a71f50caec15a740abbf554eed701cf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 27 Aug 2024 15:09:49 +0800 Subject: [PATCH] fix(query): fix memory leak. --- source/common/src/tdatablock.c | 12 +-- source/libs/executor/src/tsort.c | 164 ++++++++++++++----------------- 2 files changed, 80 insertions(+), 96 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index d0bae5d6a7..16710d9555 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1232,7 +1232,7 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { return 0; } -static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) { +static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) { size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = &pCols[i]; @@ -1260,8 +1260,6 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB } } } - - return TSDB_CODE_SUCCESS; } static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) { @@ -1448,18 +1446,16 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { } int64_t p2 = taosGetTimestampUs(); - code = blockDataAssign(pCols, pDataBlock, index); - if (code) { - return code; - } + blockDataAssign(pCols, pDataBlock, index); int64_t p3 = taosGetTimestampUs(); copyBackToBlock(pDataBlock, pCols); - int64_t p4 = taosGetTimestampUs(); + int64_t p4 = taosGetTimestampUs(); uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); + destroyTupleIndex(index); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index c1edf700d1..621d643361 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -2212,63 +2212,57 @@ static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashOb return code; } -static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { - int32_t szSort = 0; - size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); - SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); - if (aExtSrc == NULL) { - return terrno; +static void freeHelp(void* param) { + SSDataBlock** ptr = param; + if (*ptr != NULL) { + blockDataDestroy(*ptr); } +} + +static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { + int32_t szSort = 0; + int32_t code = 0; + int32_t lino = 0; + size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); + SArray* aExtSrc = NULL; + SArray* aBlkSort = NULL; + SSHashObj* mTableNumRows = NULL; + SSHashObj* mUidBlk = NULL; + SBlockOrderInfo* pOrigTsOrder = NULL; + + aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); + QUERY_CHECK_NULL(aExtSrc, code, lino, _err, terrno); + + mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + QUERY_CHECK_NULL(mTableNumRows, code, lino, _err, terrno); + + aBlkSort = taosArrayInit(8, POINTER_BYTES); + QUERY_CHECK_NULL(aBlkSort, code, lino, _err, terrno); + + mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + QUERY_CHECK_NULL(mUidBlk, code, lino, _err, terrno); size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize); - int32_t code = createPageBuf(pHandle); - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(aExtSrc); - return code; - } + code = createPageBuf(pHandle); + QUERY_CHECK_CODE(code, lino, _err); SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); - if (pSrc == NULL) { - taosArrayDestroy(aExtSrc); - return TSDB_CODE_INVALID_PARA; - } + QUERY_CHECK_NULL(pSrc, code, lino, _err, terrno); - SBlockOrderInfo* pOrigTsOrder = (!pHandle->bSortByRowId) ? - taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); - if (pOrigTsOrder == NULL) { - return terrno; - } + pOrigTsOrder = + (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); + QUERY_CHECK_NULL(pOrigTsOrder, code, lino, _err, terrno); - if (pOrigTsOrder->order == TSDB_ORDER_ASC) { - pHandle->currMergeLimitTs = INT64_MAX; - } else { - pHandle->currMergeLimitTs = INT64_MIN; - } - - SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); - if (mTableNumRows == NULL) { - return terrno; - } - - SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); - if (aBlkSort == NULL) { - tSimpleHashCleanup(mTableNumRows); - return terrno; - } - - SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); - if (mUidBlk == NULL) { - tSimpleHashCleanup(mTableNumRows); - taosArrayDestroy(aBlkSort); - return terrno; - } + pHandle->currMergeLimitTs = (pOrigTsOrder->order == TSDB_ORDER_ASC)? INT64_MAX:INT64_MIN; while (1) { - bool bExtractedBlock = false; - bool bSkipBlock = false; + bool bExtractedBlock = false; + bool bSkipBlock = false; SSDataBlock* pBlk = NULL; - - TAOS_CHECK_RETURN(pHandle->fetchfp(pSrc->param, &pBlk)); + + code = pHandle->fetchfp(pSrc->param, &pBlk); + QUERY_CHECK_CODE(code, lino, _err); + if (pBlk != NULL && pHandle->mergeLimit > 0) { SSDataBlock* p = NULL; code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p); @@ -2281,9 +2275,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); - if (tsCol == NULL) { - return terrno; - } + QUERY_CHECK_NULL(tsCol, code, lino, _err, terrno); int64_t firstRowTs = *(int64_t*)tsCol->pData; if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || @@ -2300,7 +2292,8 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid)); if (ppBlk != NULL) { SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); - TAOS_CHECK_RETURN(blockDataMerge(tBlk, pBlk)); + code = blockDataMerge(tBlk, pBlk); + QUERY_CHECK_CODE(code, lino, _err); if (bExtractedBlock) { blockDataDestroy(pBlk); @@ -2310,18 +2303,15 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (bExtractedBlock) { tBlk = pBlk; } else { - TAOS_CHECK_RETURN(createOneDataBlock(pBlk, true, &tBlk)); + code = createOneDataBlock(pBlk, true, &tBlk); + QUERY_CHECK_CODE(code, lino, _err); } code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); - if (code) { - return code; - } + QUERY_CHECK_CODE(code, lino, _err); void* px = taosArrayPush(aBlkSort, &tBlk); - if (px == NULL) { - return terrno; - } + QUERY_CHECK_NULL(px, code, lino, _err, terrno); } } @@ -2330,17 +2320,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int64_t p = taosGetTimestampUs(); if (pHandle->bSortByRowId) { - TAOS_CHECK_RETURN(tsortOpenRegion(pHandle)); + code = tsortOpenRegion(pHandle); + QUERY_CHECK_CODE(code, lino, _err); } code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc); - if (code != TSDB_CODE_SUCCESS) { - for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { - blockDataDestroy(taosArrayGetP(aBlkSort, i)); - } - taosArrayClear(aBlkSort); - break; - } + QUERY_CHECK_CODE(code, lino, _err); if (pHandle->bSortByRowId) { code = tsortCloseRegion(pHandle); // ignore this error code @@ -2348,14 +2333,10 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; + taosArrayClearEx(aBlkSort, freeHelp); - for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { - blockDataDestroy(taosArrayGetP(aBlkSort, i)); - } - - taosArrayClear(aBlkSort); szSort = 0; - qDebug("source %zu created", taosArrayGetSize(aExtSrc)); + qDebug("%s source %zu created", pHandle->idStr, taosArrayGetSize(aExtSrc)); } if (pBlk == NULL) { @@ -2363,40 +2344,47 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } if (tsortIsClosed(pHandle)) { - tSimpleHashClear(mUidBlk); - for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { - blockDataDestroy(taosArrayGetP(aBlkSort, i)); - } - taosArrayClear(aBlkSort); break; } } - tSimpleHashCleanup(mUidBlk); - for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) { - blockDataDestroy(taosArrayGetP(aBlkSort, i)); - } - taosArrayDestroy(aBlkSort); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); if (!tsortIsClosed(pHandle)) { void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); - if (px == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - } + QUERY_CHECK_NULL(px, code, lino, _err, terrno); } - taosArrayDestroy(aExtSrc); - tSimpleHashCleanup(mTableNumRows); if (pHandle->bSortByRowId) { code = tsortFinalizeRegions(pHandle); } pHandle->type = SORT_SINGLESOURCE_SORT; + + _err: + if (code) { + qError("%s %s failed at line %d since %s", pHandle->idStr, __func__, lino, tstrerror(code)); + } + + if (aExtSrc) { + taosArrayDestroy(aExtSrc); + } + if (aBlkSort) { + taosArrayDestroyEx(aBlkSort, freeHelp); + } + if (mTableNumRows) { + tSimpleHashCleanup(mTableNumRows); + } + if (mUidBlk) { + tSimpleHashCleanup(mUidBlk); + } return code; } static void freeSSortSource(SSortSource* source) { - if (NULL == source) return; + if (NULL == source) { + return; + } + if (source->param && !source->onlyRef) { taosMemoryFree(source->param); }