feat: support multiway sort merge

This commit is contained in:
shenglian zhou 2022-06-06 15:16:20 +08:00
parent c438b699c9
commit 72c71141b2
1 changed files with 55 additions and 56 deletions

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tdatablock.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "tdatablock.h"
static SSDataBlock* doSort(SOperatorInfo* pOperator); static SSDataBlock* doSort(SOperatorInfo* pOperator);
static int32_t doOpenSortOperator(SOperatorInfo* pOperator); static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
@ -22,8 +22,9 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uin
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo,
SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) { SExprInfo* pExprInfo, int32_t numOfCols, SArray* pColMatchColInfo,
SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t rowSize = pResBlock->info.rowSize; int32_t rowSize = pResBlock->info.rowSize;
@ -40,7 +41,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo= pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "SortOperator"; pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true; pOperator->blocking = true;
@ -48,17 +49,17 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
pOperator->info = pInfo; pOperator->info = pInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now. // lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize +
// pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer // header pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL,
createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, getExplainExecInfo); getExplainExecInfo);
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
@ -80,7 +81,8 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo) { SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo) {
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
ASSERT(taosArrayGetSize(pColMatchInfo) == pDataBlock->info.numOfCols); ASSERT(taosArrayGetSize(pColMatchInfo) == pDataBlock->info.numOfCols);
@ -132,7 +134,8 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo* pOperator = param; SOperatorInfo* pOperator = param;
SSortOperatorInfo* pSort = pOperator->info; SSortOperatorInfo* pSort = pOperator->info;
if (pOperator->pExpr != NULL) { if (pOperator->pExpr != NULL) {
int32_t code = projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL); int32_t code =
projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code); longjmp(pOperator->pTaskInfo->env, code);
} }
@ -150,8 +153,8 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
pInfo->startTs = taosGetTimestampUs(); pInfo->startTs = taosGetTimestampUs();
// pInfo->binfo.pRes is not equalled to the input datablock. // pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_SINGLESOURCE_SORT, -1, -1,
-1, -1, NULL, pTaskInfo->id.str); NULL, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
@ -166,7 +169,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, terrno); longjmp(pTaskInfo->env, terrno);
} }
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs)/1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
@ -186,7 +189,8 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo); SSDataBlock* pBlock =
getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
if (pBlock != NULL) { if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
@ -208,7 +212,7 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t*
ASSERT(pOptr != NULL); ASSERT(pOptr != NULL);
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
SSortOperatorInfo *pOperatorInfo = (SSortOperatorInfo*)pOptr->info; SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info;
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle); *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
*pOptrExplain = pInfo; *pOptrExplain = pInfo;
@ -229,9 +233,8 @@ typedef struct SMultiwaySortMergeOperatorInfo {
int64_t startTs; // sort start time int64_t startTs; // sort start time
} SMultiwaySortMergeOperatorInfo; } SMultiwaySortMergeOperatorInfo;
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
SMultiwaySortMergeOperatorInfo * pInfo = pOperator->info; SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (OPTR_IS_OPENED(pOperator)) { if (OPTR_IS_OPENED(pOperator)) {
@ -259,7 +262,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
longjmp(pTaskInfo->env, terrno); longjmp(pTaskInfo->env, terrno);
} }
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs)/1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
@ -272,17 +275,15 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
} }
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortOperatorInfo* pInfo = pOperator->info; SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator); int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, SSDataBlock* pBlock =
pInfo->binfo.pRes, getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
pOperator->resultInfo.capacity,
pInfo->pColMatchInfo);
if (pBlock != NULL) { if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
@ -293,17 +294,18 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
} }
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) { void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->pColMatchInfo);
} }
int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
ASSERT(pOptr != NULL); ASSERT(pOptr != NULL);
SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
SMultiwaySortMergeOperatorInfo *pOperatorInfo = (SMultiwaySortMergeOperatorInfo*)pOptr->info; SMultiwaySortMergeOperatorInfo* pOperatorInfo = (SMultiwaySortMergeOperatorInfo*)pOptr->info;
*pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle); *pInfo = tsortGetSortExecInfo(pOperatorInfo->pSortHandle);
*pOptrExplain = pInfo; *pOptrExplain = pInfo;
@ -327,7 +329,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo= pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "MultiwaySortMerge"; pOperator->name = "MultiwaySortMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = true; pOperator->blocking = true;
@ -339,11 +341,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
NULL, destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
NULL, destroyMultiwaySortMergeOperatorInfo,
NULL, NULL,
getMultiwaySortMergeExplainExecInfo);
int32_t code = appendDownstream(pOperator, downStreams, numStreams); int32_t code = appendDownstream(pOperator, downStreams, numStreams);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {