fix(query): fix memory leak.

This commit is contained in:
Haojun Liao 2024-08-27 15:09:49 +08:00
parent dc7f56e7f2
commit 5f23ba709a
2 changed files with 80 additions and 96 deletions

View File

@ -1232,7 +1232,7 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
return 0; return 0;
} }
static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) { static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = &pCols[i]; SColumnInfoData* pDst = &pCols[i];
@ -1260,8 +1260,6 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
} }
} }
} }
return TSDB_CODE_SUCCESS;
} }
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) { static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
@ -1448,18 +1446,16 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
} }
int64_t p2 = taosGetTimestampUs(); int64_t p2 = taosGetTimestampUs();
code = blockDataAssign(pCols, pDataBlock, index); blockDataAssign(pCols, pDataBlock, index);
if (code) {
return code;
}
int64_t p3 = taosGetTimestampUs(); int64_t p3 = taosGetTimestampUs();
copyBackToBlock(pDataBlock, pCols); copyBackToBlock(pDataBlock, pCols);
int64_t p4 = taosGetTimestampUs();
int64_t p4 = taosGetTimestampUs();
uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
", rows:%d\n", ", rows:%d\n",
p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
destroyTupleIndex(index); destroyTupleIndex(index);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -2212,63 +2212,57 @@ static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashOb
return code; return code;
} }
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { static void freeHelp(void* param) {
int32_t szSort = 0; SSDataBlock** ptr = param;
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); if (*ptr != NULL) {
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES); blockDataDestroy(*ptr);
if (aExtSrc == NULL) {
return terrno;
} }
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int32_t szSort = 0;
int32_t code = 0;
int32_t lino = 0;
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = NULL;
SArray* aBlkSort = NULL;
SSHashObj* mTableNumRows = NULL;
SSHashObj* mUidBlk = NULL;
SBlockOrderInfo* pOrigTsOrder = NULL;
aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
QUERY_CHECK_NULL(aExtSrc, code, lino, _err, terrno);
mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
QUERY_CHECK_NULL(mTableNumRows, code, lino, _err, terrno);
aBlkSort = taosArrayInit(8, POINTER_BYTES);
QUERY_CHECK_NULL(aBlkSort, code, lino, _err, terrno);
mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
QUERY_CHECK_NULL(mUidBlk, code, lino, _err, terrno);
size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize); size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
int32_t code = createPageBuf(pHandle); code = createPageBuf(pHandle);
if (code != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _err);
taosArrayDestroy(aExtSrc);
return code;
}
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0); SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
if (pSrc == NULL) { QUERY_CHECK_NULL(pSrc, code, lino, _err, terrno);
taosArrayDestroy(aExtSrc);
return TSDB_CODE_INVALID_PARA;
}
SBlockOrderInfo* pOrigTsOrder = (!pHandle->bSortByRowId) ? pOrigTsOrder =
taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
if (pOrigTsOrder == NULL) { QUERY_CHECK_NULL(pOrigTsOrder, code, lino, _err, terrno);
return terrno;
}
if (pOrigTsOrder->order == TSDB_ORDER_ASC) { pHandle->currMergeLimitTs = (pOrigTsOrder->order == TSDB_ORDER_ASC)? INT64_MAX:INT64_MIN;
pHandle->currMergeLimitTs = INT64_MAX;
} else {
pHandle->currMergeLimitTs = INT64_MIN;
}
SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
if (mTableNumRows == NULL) {
return terrno;
}
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
if (aBlkSort == NULL) {
tSimpleHashCleanup(mTableNumRows);
return terrno;
}
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
if (mUidBlk == NULL) {
tSimpleHashCleanup(mTableNumRows);
taosArrayDestroy(aBlkSort);
return terrno;
}
while (1) { while (1) {
bool bExtractedBlock = false; bool bExtractedBlock = false;
bool bSkipBlock = false; bool bSkipBlock = false;
SSDataBlock* pBlk = NULL; SSDataBlock* pBlk = NULL;
TAOS_CHECK_RETURN(pHandle->fetchfp(pSrc->param, &pBlk)); code = pHandle->fetchfp(pSrc->param, &pBlk);
QUERY_CHECK_CODE(code, lino, _err);
if (pBlk != NULL && pHandle->mergeLimit > 0) { if (pBlk != NULL && pHandle->mergeLimit > 0) {
SSDataBlock* p = NULL; SSDataBlock* p = NULL;
code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p); code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p);
@ -2281,9 +2275,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (pBlk != NULL) { if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId); SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
if (tsCol == NULL) { QUERY_CHECK_NULL(tsCol, code, lino, _err, terrno);
return terrno;
}
int64_t firstRowTs = *(int64_t*)tsCol->pData; int64_t firstRowTs = *(int64_t*)tsCol->pData;
if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) || if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
@ -2300,7 +2292,8 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid)); void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid));
if (ppBlk != NULL) { if (ppBlk != NULL) {
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
TAOS_CHECK_RETURN(blockDataMerge(tBlk, pBlk)); code = blockDataMerge(tBlk, pBlk);
QUERY_CHECK_CODE(code, lino, _err);
if (bExtractedBlock) { if (bExtractedBlock) {
blockDataDestroy(pBlk); blockDataDestroy(pBlk);
@ -2310,18 +2303,15 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (bExtractedBlock) { if (bExtractedBlock) {
tBlk = pBlk; tBlk = pBlk;
} else { } else {
TAOS_CHECK_RETURN(createOneDataBlock(pBlk, true, &tBlk)); code = createOneDataBlock(pBlk, true, &tBlk);
QUERY_CHECK_CODE(code, lino, _err);
} }
code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
if (code) { QUERY_CHECK_CODE(code, lino, _err);
return code;
}
void* px = taosArrayPush(aBlkSort, &tBlk); void* px = taosArrayPush(aBlkSort, &tBlk);
if (px == NULL) { QUERY_CHECK_NULL(px, code, lino, _err, terrno);
return terrno;
}
} }
} }
@ -2330,17 +2320,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int64_t p = taosGetTimestampUs(); int64_t p = taosGetTimestampUs();
if (pHandle->bSortByRowId) { if (pHandle->bSortByRowId) {
TAOS_CHECK_RETURN(tsortOpenRegion(pHandle)); code = tsortOpenRegion(pHandle);
QUERY_CHECK_CODE(code, lino, _err);
} }
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc); code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
if (code != TSDB_CODE_SUCCESS) { QUERY_CHECK_CODE(code, lino, _err);
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
break;
}
if (pHandle->bSortByRowId) { if (pHandle->bSortByRowId) {
code = tsortCloseRegion(pHandle); // ignore this error code code = tsortCloseRegion(pHandle); // ignore this error code
@ -2348,14 +2333,10 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el; pHandle->sortElapsed += el;
taosArrayClearEx(aBlkSort, freeHelp);
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
szSort = 0; szSort = 0;
qDebug("source %zu created", taosArrayGetSize(aExtSrc)); qDebug("%s source %zu created", pHandle->idStr, taosArrayGetSize(aExtSrc));
} }
if (pBlk == NULL) { if (pBlk == NULL) {
@ -2363,40 +2344,47 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
} }
if (tsortIsClosed(pHandle)) { if (tsortIsClosed(pHandle)) {
tSimpleHashClear(mUidBlk);
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
break; break;
} }
} }
tSimpleHashCleanup(mUidBlk);
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
if (!tsortIsClosed(pHandle)) { if (!tsortIsClosed(pHandle)) {
void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
if (px == NULL) { QUERY_CHECK_NULL(px, code, lino, _err, terrno);
code = TSDB_CODE_OUT_OF_MEMORY;
}
} }
taosArrayDestroy(aExtSrc);
tSimpleHashCleanup(mTableNumRows);
if (pHandle->bSortByRowId) { if (pHandle->bSortByRowId) {
code = tsortFinalizeRegions(pHandle); code = tsortFinalizeRegions(pHandle);
} }
pHandle->type = SORT_SINGLESOURCE_SORT; pHandle->type = SORT_SINGLESOURCE_SORT;
_err:
if (code) {
qError("%s %s failed at line %d since %s", pHandle->idStr, __func__, lino, tstrerror(code));
}
if (aExtSrc) {
taosArrayDestroy(aExtSrc);
}
if (aBlkSort) {
taosArrayDestroyEx(aBlkSort, freeHelp);
}
if (mTableNumRows) {
tSimpleHashCleanup(mTableNumRows);
}
if (mUidBlk) {
tSimpleHashCleanup(mUidBlk);
}
return code; return code;
} }
static void freeSSortSource(SSortSource* source) { static void freeSSortSource(SSortSource* source) {
if (NULL == source) return; if (NULL == source) {
return;
}
if (source->param && !source->onlyRef) { if (source->param && !source->onlyRef) {
taosMemoryFree(source->param); taosMemoryFree(source->param);
} }