diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c new file mode 100644 index 0000000000..0f973b0cf0 --- /dev/null +++ b/source/libs/executor/src/sortoperator.c @@ -0,0 +1,139 @@ +#include "tdatablock.h" +#include "executorimpl.h" + +static SSDataBlock* doSort(SOperatorInfo* pOperator); +static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); + +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, + SArray* pIndexMap, SExecTaskInfo* pTaskInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + int32_t rowSize = pResBlock->info.rowSize; + + if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { + goto _error; + } + + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = numOfCols; + pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = pResBlock; + + initResultSizeInfo(pOperator, 1024); + pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header + + pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer + pInfo->pSortInfo = pSortInfo; + pInfo->inputSlotMap = pIndexMap; + pOperator->name = "SortOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + + pOperator->pTaskInfo = pTaskInfo; + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL); + + int32_t code = appendDownstream(pOperator, &downstream, 1); + return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} + +// TODO merge aggregate super table +void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + bool isNull = tsortIsNullVal(pTupleHandle, i); + if (isNull) { + colDataAppend(pColInfo, pBlock->info.rows, NULL, true); + } else { + char* pData = tsortGetValue(pTupleHandle, i); + colDataAppend(pColInfo, pBlock->info.rows, pData, false); + } + } + + pBlock->info.rows += 1; +} + +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { + blockDataCleanup(pDataBlock); + blockDataEnsureCapacity(pDataBlock, capacity); + + blockDataEnsureCapacity(pDataBlock, capacity); + + while (1) { + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + + appendOneRowToDataBlock(pDataBlock, pTupleHandle); + if (pDataBlock->info.rows >= capacity) { + return pDataBlock; + } + } + + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + +SSDataBlock* loadNextDataBlock(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*)param; + return pOperator->fpSet.getNextFn(pOperator); +} + +// todo refactor: merged with fetch fp +void applyScalarFunction(SSDataBlock* pBlock, void* param) { + SOperatorInfo* pOperator = param; + SSortOperatorInfo* pSort = pOperator->info; + if (pOperator->pExpr != NULL) { + projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL); + } +} + +SSDataBlock* doSort(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSortOperatorInfo* pInfo = pOperator->info; + + if (pOperator->status == OP_RES_TO_RETURN) { + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); + } + + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, + pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); + + + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + ps->param = pOperator->pDownstream[0]; + tsortAddSource(pInfo->pSortHandle, ps); + + int32_t code = tsortOpen(pInfo->pSortHandle); + taosMemoryFreeClear(ps); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + pOperator->status = OP_RES_TO_RETURN; + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); +} + +void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { + SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; + pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + + taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->inputSlotMap); +} diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index e646387856..c037fae75f 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -210,7 +210,7 @@ TEST(testCase, inMem_sort_Test) { taosArrayPush(orderInfo, &oi); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc"); - tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); + tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); _info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info)); pInfo->startVal = 0; @@ -299,7 +299,7 @@ TEST(testCase, external_mem_sort_Test) { taosArrayPush(orderInfo, &oi); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc"); - tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); + tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); SSortSource* ps = static_cast(taosMemoryCalloc(1, sizeof(SSortSource))); ps->param = &pInfo[i]; @@ -366,7 +366,7 @@ TEST(testCase, ordered_merge_sort_Test) { } SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc"); - tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); + tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); tsortSetComparFp(phandle, docomp); SSortSource* p[10] = {0};