From 72c71141b2bfeabe07fc7bab64a4d2ca13234dfc Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 6 Jun 2022 15:16:20 +0800 Subject: [PATCH] feat: support multiway sort merge --- source/libs/executor/src/sortoperator.c | 111 ++++++++++++------------ 1 file changed, 55 insertions(+), 56 deletions(-) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index c219acb687..c84d4491af 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -13,17 +13,18 @@ * along with this program. If not, see . */ -#include "tdatablock.h" #include "executorimpl.h" +#include "tdatablock.h" static SSDataBlock* doSort(SOperatorInfo* pOperator); -static int32_t doOpenSortOperator(SOperatorInfo* pOperator); -static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); +static int32_t doOpenSortOperator(SOperatorInfo* pOperator); +static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, - SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, + SExprInfo* pExprInfo, int32_t numOfCols, SArray* pColMatchColInfo, + SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); int32_t rowSize = pResBlock->info.rowSize; @@ -32,33 +33,33 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR goto _error; } - pOperator->pExpr = pExprInfo; + pOperator->pExpr = pExprInfo; pOperator->numOfExprs = numOfCols; - pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); - pInfo->binfo.pRes = pResBlock; + pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = pResBlock; initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = pSortInfo; - pInfo->pColMatchInfo= pColMatchColInfo; - pOperator->name = "SortOperator"; + pInfo->pSortInfo = pSortInfo; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; // lazy evaluation for the following parameter since the input datablock is not known till now. -// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header -// pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer + // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + + // header pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = - createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, getExplainExecInfo); + pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, + getExplainExecInfo); int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; - _error: +_error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pInfo); taosMemoryFree(pOperator); @@ -68,7 +69,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - bool isNull = tsortIsNullVal(pTupleHandle, i); + bool isNull = tsortIsNullVal(pTupleHandle, i); if (isNull) { colDataAppendNULL(pColInfo, pBlock->info.rows); } else { @@ -80,7 +81,8 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { pBlock->info.rows += 1; } -SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo) { +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, + SArray* pColMatchInfo) { blockDataCleanup(pDataBlock); ASSERT(taosArrayGetSize(pColMatchInfo) == pDataBlock->info.numOfCols); @@ -129,10 +131,11 @@ SSDataBlock* loadNextDataBlock(void* param) { // todo refactor: merged with fetch fp void applyScalarFunction(SSDataBlock* pBlock, void* param) { - SOperatorInfo* pOperator = param; + SOperatorInfo* pOperator = param; SSortOperatorInfo* pSort = pOperator->info; if (pOperator->pExpr != NULL) { - int32_t code = projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL); + int32_t code = + projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL); if (code != TSDB_CODE_SUCCESS) { longjmp(pOperator->pTaskInfo->env, code); } @@ -141,7 +144,7 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) { int32_t doOpenSortOperator(SOperatorInfo* pOperator) { SSortOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; @@ -150,8 +153,8 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { pInfo->startTs = taosGetTimestampUs(); // pInfo->binfo.pRes is not equalled to the input datablock. - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT, - -1, -1, NULL, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT, -1, -1, + NULL, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); @@ -166,7 +169,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, terrno); } - pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs)/1000.0; + pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0; pOperator->status = OP_RES_TO_RETURN; OPTR_SET_OPENED(pOperator); @@ -186,7 +189,8 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo); + SSDataBlock* pBlock = + getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -208,7 +212,7 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* ASSERT(pOptr != NULL); SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); - SSortOperatorInfo *pOperatorInfo = (SSortOperatorInfo*)pOptr->info; + SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info; *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle); *pOptrExplain = pInfo; @@ -219,20 +223,19 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* typedef struct SMultiwaySortMergeOperatorInfo { SOptrBasicInfo binfo; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort SArray* pSortInfo; SSortHandle* pSortHandle; SArray* pColMatchInfo; // for index map from table scan output - int64_t startTs; // sort start time + int64_t startTs; // sort start time } SMultiwaySortMergeOperatorInfo; - int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { - SMultiwaySortMergeOperatorInfo * pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; @@ -259,7 +262,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, terrno); } - pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs)/1000.0; + pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0; pOperator->status = OP_RES_TO_RETURN; OPTR_SET_OPENED(pOperator); @@ -271,18 +274,16 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSortOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, - pInfo->binfo.pRes, - pOperator->resultInfo.capacity, - pInfo->pColMatchInfo); + SSDataBlock* pBlock = + getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -293,17 +294,18 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { } void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) { - SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; + SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->pColMatchInfo); } int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { ASSERT(pOptr != NULL); SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); - SMultiwaySortMergeOperatorInfo *pOperatorInfo = (SMultiwaySortMergeOperatorInfo*)pOptr->info; + SMultiwaySortMergeOperatorInfo* pOperatorInfo = (SMultiwaySortMergeOperatorInfo*)pOptr->info; *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle); *pOptrExplain = pInfo; @@ -315,35 +317,32 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) { SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - int32_t rowSize = pResBlock->info.rowSize; + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + int32_t rowSize = pResBlock->info.rowSize; if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { goto _error; } - pInfo->binfo.pRes = pResBlock; + pInfo->binfo.pRes = pResBlock; initResultSizeInfo(pOperator, 1024); - pInfo->pSortInfo = pSortInfo; - pInfo->pColMatchInfo= pColMatchColInfo; - pOperator->name = "MultiwaySortMerge"; + pInfo->pSortInfo = pSortInfo; + pInfo->pColMatchInfo = pColMatchColInfo; + pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; pInfo->sortBufSize = pInfo->bufPageSize * 16; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = - createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, - NULL, - NULL, destroyMultiwaySortMergeOperatorInfo, - NULL, NULL, - getMultiwaySortMergeExplainExecInfo); + createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL, + destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo); int32_t code = appendDownstream(pOperator, downStreams, numStreams); if (code != TSDB_CODE_SUCCESS) {