Merge pull request #27490 from taosdata/fix/syntax

fix(query): fix memory leak.
This commit is contained in:
Haojun Liao 2024-08-27 17:04:04 +08:00 committed by GitHub
commit 62ad276113
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 80 additions and 96 deletions

View File

@ -1232,7 +1232,7 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
return 0;
}
static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, const int32_t* index) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = &pCols[i];
@ -1260,8 +1260,6 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t createHelpColInfoData(const SSDataBlock* pDataBlock, SColumnInfoData** ppCols) {
@ -1448,18 +1446,16 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
}
int64_t p2 = taosGetTimestampUs();
code = blockDataAssign(pCols, pDataBlock, index);
if (code) {
return code;
}
blockDataAssign(pCols, pDataBlock, index);
int64_t p3 = taosGetTimestampUs();
copyBackToBlock(pDataBlock, pCols);
int64_t p4 = taosGetTimestampUs();
int64_t p4 = taosGetTimestampUs();
uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
", rows:%d\n",
p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
destroyTupleIndex(index);
return TSDB_CODE_SUCCESS;
}

View File

@ -2212,63 +2212,57 @@ static int32_t getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashOb
return code;
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int32_t szSort = 0;
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
if (aExtSrc == NULL) {
return terrno;
static void freeHelp(void* param) {
SSDataBlock** ptr = param;
if (*ptr != NULL) {
blockDataDestroy(*ptr);
}
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int32_t szSort = 0;
int32_t code = 0;
int32_t lino = 0;
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
SArray* aExtSrc = NULL;
SArray* aBlkSort = NULL;
SSHashObj* mTableNumRows = NULL;
SSHashObj* mUidBlk = NULL;
SBlockOrderInfo* pOrigTsOrder = NULL;
aExtSrc = taosArrayInit(nSrc, POINTER_BYTES);
QUERY_CHECK_NULL(aExtSrc, code, lino, _err, terrno);
mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
QUERY_CHECK_NULL(mTableNumRows, code, lino, _err, terrno);
aBlkSort = taosArrayInit(8, POINTER_BYTES);
QUERY_CHECK_NULL(aBlkSort, code, lino, _err, terrno);
mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
QUERY_CHECK_NULL(mUidBlk, code, lino, _err, terrno);
size_t maxBufSize = (pHandle->bSortByRowId) ? pHandle->extRowsMemSize : (pHandle->numOfPages * pHandle->pageSize);
int32_t code = createPageBuf(pHandle);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(aExtSrc);
return code;
}
code = createPageBuf(pHandle);
QUERY_CHECK_CODE(code, lino, _err);
SSortSource* pSrc = taosArrayGetP(pHandle->pOrderedSource, 0);
if (pSrc == NULL) {
taosArrayDestroy(aExtSrc);
return TSDB_CODE_INVALID_PARA;
}
QUERY_CHECK_NULL(pSrc, code, lino, _err, terrno);
SBlockOrderInfo* pOrigTsOrder = (!pHandle->bSortByRowId) ?
taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
if (pOrigTsOrder == NULL) {
return terrno;
}
pOrigTsOrder =
(!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
QUERY_CHECK_NULL(pOrigTsOrder, code, lino, _err, terrno);
if (pOrigTsOrder->order == TSDB_ORDER_ASC) {
pHandle->currMergeLimitTs = INT64_MAX;
} else {
pHandle->currMergeLimitTs = INT64_MIN;
}
SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
if (mTableNumRows == NULL) {
return terrno;
}
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
if (aBlkSort == NULL) {
tSimpleHashCleanup(mTableNumRows);
return terrno;
}
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
if (mUidBlk == NULL) {
tSimpleHashCleanup(mTableNumRows);
taosArrayDestroy(aBlkSort);
return terrno;
}
pHandle->currMergeLimitTs = (pOrigTsOrder->order == TSDB_ORDER_ASC)? INT64_MAX:INT64_MIN;
while (1) {
bool bExtractedBlock = false;
bool bSkipBlock = false;
bool bExtractedBlock = false;
bool bSkipBlock = false;
SSDataBlock* pBlk = NULL;
TAOS_CHECK_RETURN(pHandle->fetchfp(pSrc->param, &pBlk));
code = pHandle->fetchfp(pSrc->param, &pBlk);
QUERY_CHECK_CODE(code, lino, _err);
if (pBlk != NULL && pHandle->mergeLimit > 0) {
SSDataBlock* p = NULL;
code = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock, &bSkipBlock, &p);
@ -2281,9 +2275,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrigTsOrder->slotId);
if (tsCol == NULL) {
return terrno;
}
QUERY_CHECK_NULL(tsCol, code, lino, _err, terrno);
int64_t firstRowTs = *(int64_t*)tsCol->pData;
if ((pOrigTsOrder->order == TSDB_ORDER_ASC && firstRowTs > pHandle->currMergeLimitTs) ||
@ -2300,7 +2292,8 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
void* ppBlk = tSimpleHashGet(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid));
if (ppBlk != NULL) {
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
TAOS_CHECK_RETURN(blockDataMerge(tBlk, pBlk));
code = blockDataMerge(tBlk, pBlk);
QUERY_CHECK_CODE(code, lino, _err);
if (bExtractedBlock) {
blockDataDestroy(pBlk);
@ -2310,18 +2303,15 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (bExtractedBlock) {
tBlk = pBlk;
} else {
TAOS_CHECK_RETURN(createOneDataBlock(pBlk, true, &tBlk));
code = createOneDataBlock(pBlk, true, &tBlk);
QUERY_CHECK_CODE(code, lino, _err);
}
code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
if (code) {
return code;
}
QUERY_CHECK_CODE(code, lino, _err);
void* px = taosArrayPush(aBlkSort, &tBlk);
if (px == NULL) {
return terrno;
}
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
}
}
@ -2330,17 +2320,12 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int64_t p = taosGetTimestampUs();
if (pHandle->bSortByRowId) {
TAOS_CHECK_RETURN(tsortOpenRegion(pHandle));
code = tsortOpenRegion(pHandle);
QUERY_CHECK_CODE(code, lino, _err);
}
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
if (code != TSDB_CODE_SUCCESS) {
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
break;
}
QUERY_CHECK_CODE(code, lino, _err);
if (pHandle->bSortByRowId) {
code = tsortCloseRegion(pHandle); // ignore this error code
@ -2348,14 +2333,10 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
taosArrayClearEx(aBlkSort, freeHelp);
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
szSort = 0;
qDebug("source %zu created", taosArrayGetSize(aExtSrc));
qDebug("%s source %zu created", pHandle->idStr, taosArrayGetSize(aExtSrc));
}
if (pBlk == NULL) {
@ -2363,40 +2344,47 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
if (tsortIsClosed(pHandle)) {
tSimpleHashClear(mUidBlk);
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
break;
}
}
tSimpleHashCleanup(mUidBlk);
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
if (!tsortIsClosed(pHandle)) {
void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
if (px == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
}
taosArrayDestroy(aExtSrc);
tSimpleHashCleanup(mTableNumRows);
if (pHandle->bSortByRowId) {
code = tsortFinalizeRegions(pHandle);
}
pHandle->type = SORT_SINGLESOURCE_SORT;
_err:
if (code) {
qError("%s %s failed at line %d since %s", pHandle->idStr, __func__, lino, tstrerror(code));
}
if (aExtSrc) {
taosArrayDestroy(aExtSrc);
}
if (aBlkSort) {
taosArrayDestroyEx(aBlkSort, freeHelp);
}
if (mTableNumRows) {
tSimpleHashCleanup(mTableNumRows);
}
if (mUidBlk) {
tSimpleHashCleanup(mUidBlk);
}
return code;
}
static void freeSSortSource(SSortSource* source) {
if (NULL == source) return;
if (NULL == source) {
return;
}
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}