diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 8971ee33d3..cc5db32666 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -34,16 +34,17 @@ typedef struct SMultiMergeSource { SSDataBlock *pBlock; } SMultiMergeSource; -typedef struct SExternalMemSource { +typedef struct SSortSource { SMultiMergeSource src; - SArray* pageIdList; - int32_t pageIndex; -} SExternalMemSource; + union{ + struct{ + SArray* pageIdList; + int32_t pageIndex; + }; + void *param; + }; -typedef struct SGenericSource { - SMultiMergeSource src; - void *param; -} SGenericSource; +} SSortSource; typedef struct SMsortComparParam { void **pSources; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b66107b4a7..c418d78540 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5982,7 +5982,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = pOperator->pDownstream[i]; tsortAddSource(pInfo->pSortHandle, ps); } @@ -6128,7 +6128,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); - SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); ps->param = pOperator->pDownstream[0]; tsortAddSource(pInfo->pSortHandle, ps); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7e01fe0ba8..2aa8f6a09d 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -113,7 +113,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page static int32_t sortComparClearup(SMsortComparParam* cmpParam) { for(int32_t i = 0; i < cmpParam->numOfSources; ++i) { - SExternalMemSource* pSource = cmpParam->pSources[i]; + SSortSource* pSource = cmpParam->pSources[i]; // NOTICE: pSource may be SGenericSource *, if it is SORT_MULTISOURCE_MERGE blockDataDestroy(pSource->src.pBlock); taosMemoryFreeClear(pSource); } @@ -132,7 +132,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { taosMemoryFreeClear(pSortHandle->idStr); blockDataDestroy(pSortHandle->pDataBlock); for (size_t i = 0; i < taosArrayGetSize(pSortHandle->pOrderedSource); i++){ - SExternalMemSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i); + SSortSource** pSource = taosArrayGet(pSortHandle->pOrderedSource, i); blockDataDestroy((*pSource)->src.pBlock); taosMemoryFreeClear(*pSource); } @@ -146,7 +146,7 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) { } static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSources, SSDataBlock* pBlock, int32_t* sourceId) { - SExternalMemSource* pSource = taosMemoryCalloc(1, sizeof(SExternalMemSource)); + SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); if (pSource == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -216,7 +216,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int if (pHandle->type == SORT_SINGLESOURCE_SORT) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { - SExternalMemSource* pSource = cmpParam->pSources[i]; + SSortSource* pSource = cmpParam->pSources[i]; SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo)); @@ -238,7 +238,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int } for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { - SGenericSource* pSource = cmpParam->pSources[i]; + SSortSource* pSource = cmpParam->pSources[i]; pSource->src.pBlock = pHandle->fetchfp(pSource->param); } } @@ -265,7 +265,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSou *rowIndex += 1; } -static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) { +static int32_t adjustMergeTreeForNextTuple(SSortSource *pSource, SMultiwayMergeTreeInfo *pTree, SSortHandle *pHandle, int32_t* numOfCompleted) { /* * load a new SDataBlock into memory of a given intermediate data-set source, * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree @@ -292,7 +292,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa releaseBufPage(pHandle->pBuf, pPage); } } else { - pSource->src.pBlock = pHandle->fetchfp(((SGenericSource*)pSource)->param); + pSource->src.pBlock = pHandle->fetchfp(((SSortSource*)pSource)->param); if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; @@ -330,7 +330,7 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree); - SExternalMemSource *pSource = (*cmpParam).pSources[index]; + SSortSource *pSource = (*cmpParam).pSources[index]; appendOneRowToDataBlock(pHandle->pDataBlock, pSource->src.pBlock, &pSource->src.rowIndex); int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources); @@ -355,8 +355,8 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { SArray *pInfo = pParam->orderInfo; - SExternalMemSource* pLeftSource = pParam->pSources[pLeftIdx]; - SExternalMemSource* pRightSource = pParam->pSources[pRightIdx]; + SSortSource* pLeftSource = pParam->pSources[pLeftIdx]; + SSortSource* pRightSource = pParam->pSources[pRightIdx]; // this input is exhausted, set the special value to denote this if (pLeftSource->src.rowIndex == -1) { @@ -484,6 +484,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { blockDataCleanup(pDataBlock); } + sortComparClearup(&pHandle->cmpParam); tMergeTreeDestroy(pHandle->pMergeTree); pHandle->numOfCompletedSources = 0; @@ -494,8 +495,6 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } } - sortComparClearup(&pHandle->cmpParam); - taosArrayClear(pHandle->pOrderedSource); taosArrayAddAll(pHandle->pOrderedSource, pResList); taosArrayDestroy(pResList); @@ -523,7 +522,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; if (pHandle->type == SORT_SINGLESOURCE_SORT) { - SGenericSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); + SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); taosArrayClear(pHandle->pOrderedSource); while (1) { SSDataBlock* pBlock = pHandle->fetchfp(source->param); @@ -652,7 +651,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) { } int32_t index = tMergeTreeGetChosenIndex(pHandle->pMergeTree); - SExternalMemSource *pSource = pHandle->cmpParam.pSources[index]; + SSortSource *pSource = pHandle->cmpParam.pSources[index]; if (pHandle->needAdjust) { int32_t code = adjustMergeTreeForNextTuple(pSource, pHandle->pMergeTree, pHandle, &pHandle->numOfCompletedSources); diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index e774b8d2e3..bf77e30eaa 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -138,12 +138,12 @@ int32_t docomp(const void* p1, const void* p2, void* param) { int32_t pRightIdx = *(int32_t *)p2; SMsortComparParam *pParam = (SMsortComparParam *)param; - SGenericSource** px = reinterpret_cast(pParam->pSources); + SSortSource** px = reinterpret_cast(pParam->pSources); SArray *pInfo = pParam->orderInfo; - SGenericSource* pLeftSource = px[pLeftIdx]; - SGenericSource* pRightSource = px[pRightIdx]; + SSortSource* pLeftSource = px[pLeftIdx]; + SSortSource* pRightSource = px[pRightIdx]; // this input is exhausted, set the special value to denote this if (pLeftSource->src.rowIndex == -1) { @@ -218,7 +218,7 @@ TEST(testCase, inMem_sort_Test) { pInfo->count = 6; pInfo->type = TSDB_DATA_TYPE_USMALLINT; - SGenericSource* ps = static_cast(taosMemoryCalloc(1, sizeof(SGenericSource))); + SSortSource* ps = static_cast(taosMemoryCalloc(1, sizeof(SSortSource))); ps->param = pInfo; tsortAddSource(phandle, ps); @@ -301,7 +301,7 @@ TEST(testCase, external_mem_sort_Test) { SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc"); tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); - SGenericSource* ps = static_cast(taosMemoryCalloc(1, sizeof(SGenericSource))); + SSortSource* ps = static_cast(taosMemoryCalloc(1, sizeof(SSortSource))); ps->param = &pInfo[i]; tsortAddSource(phandle, ps); @@ -369,10 +369,10 @@ TEST(testCase, ordered_merge_sort_Test) { tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); tsortSetComparFp(phandle, docomp); - SGenericSource* p[10] = {0}; + SSortSource* p[10] = {0}; _info c[10] = {0}; for(int32_t i = 0; i < 10; ++i) { - p[i] = static_cast(taosMemoryCalloc(1, sizeof(SGenericSource))); + p[i] = static_cast(taosMemoryCalloc(1, sizeof(SSortSource))); c[i].count = 1; c[i].pageRows = 1000; c[i].startVal = i*1000; @@ -396,9 +396,8 @@ TEST(testCase, ordered_merge_sort_Test) { ASSERT_EQ(row++, *(int32_t*) v); } - for(int32_t i = 0; i < 10; ++i) { - taosMemoryFree(p[i]); - } + + taosArrayDestroy(orderInfo); tsortDestroySortHandle(phandle); blockDataDestroy(pBlock); }