From 701a148ad1be97343cd863d497853c8d6aa28df3 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 31 Oct 2023 19:59:36 +0800 Subject: [PATCH] feat: init merge operator --- source/libs/executor/src/mergeoperator.c | 207 +++++++++++++++++++++++ source/libs/executor/src/sortoperator.c | 108 +----------- 2 files changed, 214 insertions(+), 101 deletions(-) create mode 100755 source/libs/executor/src/mergeoperator.c diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c new file mode 100755 index 0000000000..9650ac4cb5 --- /dev/null +++ b/source/libs/executor/src/mergeoperator.c @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executorInt.h" +#include "filter.h" +#include "operator.h" +#include "querytask.h" +#include "tdatablock.h" + +typedef struct SSortMergeInfo { + SArray* pSortInfo; + SSortHandle* pSortHandle; + STupleHandle* prefetchedTuple; +} SSortMergeInfo; + +typedef struct SNonSortMergeInfo { + +} SNonSortMergeInfo; + +typedef struct SColumnMergeInfo { + +} SColumnMergeInfo; + +typedef struct SMultiwayMergeOperatorInfo { + SOptrBasicInfo binfo; + union { + SSortMergeInfo sortMergeInfo; + SNonSortMergeInfo nsortMergeInfo; + SColumnMergeInfo colMergeInfo; + }; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SLimitInfo limitInfo; + SColMatchInfo matchInfo; + SSDataBlock* pInputBlock; + SSDataBlock* pIntermediateBlock; // to hold the intermediate result + int64_t startTs; // sort start time + bool groupMerge; + bool ignoreGroupId; + uint64_t groupId; + bool inputWithGroupId; +} SMultiwayMergeOperatorInfo; + +int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + pInfo->startTs = taosGetTimestampUs(); + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, + pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); + tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupMerge); + + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + SOperatorInfo* pDownstream = pOperator->pDownstream[i]; + if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { + pDownstream->fpSet._openFn(pDownstream); + } + + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + ps->param = pDownstream; + ps->onlyRef = true; + + tsortAddSource(pInfo->pSortHandle, ps); + } + + int32_t code = tsortOpen(pInfo->pSortHandle); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + + pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0; + pOperator->status = OP_RES_TO_RETURN; + + OPTR_SET_OPENED(pOperator); + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + + int32_t code = pOperator->fpSet._openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + + qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo)); + SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator); + if (pBlock != NULL) { + pOperator->resultInfo.totalRows += pBlock->info.rows; + } else { + setOperatorCompleted(pOperator); + } + + return pBlock; +} + +void destroyMultiwayMergeOperatorInfo(void* param) { + SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param; + pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); + pInfo->pIntermediateBlock = blockDataDestroy(pInfo->pIntermediateBlock); + + tsortDestroySortHandle(pInfo->pSortHandle); + taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->matchInfo.pList); + + taosMemoryFreeClear(param); +} + +int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); + + SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info; + + *pSortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); + *pOptrExplain = pSortExecInfo; + + *len = sizeof(SSortExecInfo); + return TSDB_CODE_SUCCESS; +} + +SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, + SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) { + SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode; + + SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; + + int32_t code = TSDB_CODE_SUCCESS; + if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + + initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); + pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + + int32_t rowSize = pInfo->binfo.pRes->info.rowSize; + int32_t numOfOutputCols = 0; + code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, + &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); + + initResultSizeInfo(&pOperator->resultInfo, 1024); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + + pInfo->groupMerge = pMergePhyNode->groupSort; + pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId; + pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); + pInfo->pInputBlock = pInputBlock; + size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock); + pInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols); + pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. + pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder; + pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder; + pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId; + + setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); + pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, + destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL); + + code = appendDownstream(pOperator, downStreams, numStreams); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; + +_error: + if (pInfo != NULL) { + destroyMultiwayMergeOperatorInfo(pInfo); + } + + pTaskInfo->code = code; + taosMemoryFree(pOperator); + return NULL; +} diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index ccef6640be..0ccdb2dd2b 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -675,27 +675,7 @@ _error: return NULL; } -//===================================================================================== -// Multiway Sort Merge operator -typedef struct SMultiwayMergeOperatorInfo { - SOptrBasicInfo binfo; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SLimitInfo limitInfo; - SArray* pSortInfo; - SSortHandle* pSortHandle; - SColMatchInfo matchInfo; - SSDataBlock* pInputBlock; - SSDataBlock* pIntermediateBlock; // to hold the intermediate result - int64_t startTs; // sort start time - bool groupSort; - bool ignoreGroupId; - uint64_t groupId; - STupleHandle* prefetchedTuple; - bool inputWithGroupId; -} SMultiwayMergeOperatorInfo; - -int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { +int32_t openSortMergeOperator(SOperatorInfo* pOperator) { SMultiwayMergeOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -710,7 +690,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); - tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort); + tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupMerge); for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SOperatorInfo* pDownstream = pOperator->pDownstream[i]; @@ -743,7 +723,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* while (1) { STupleHandle* pTupleHandle = NULL; - if (pInfo->groupSort || pInfo->inputWithGroupId) { + if (pInfo->groupMerge || pInfo->inputWithGroupId) { if (pInfo->prefetchedTuple == NULL) { pTupleHandle = tsortNextTuple(pHandle); } else { @@ -764,7 +744,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* break; } - if (pInfo->groupSort || pInfo->inputWithGroupId) { + if (pInfo->groupMerge || pInfo->inputWithGroupId) { uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) { appendOneRowToDataBlock(p, pTupleHandle); @@ -789,7 +769,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* } } -SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo, +SSDataBlock* getSortMergeSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo, SOperatorInfo* pOperator) { SMultiwayMergeOperatorInfo* pInfo = pOperator->info; @@ -855,7 +835,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; } -SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { +SSDataBlock* doSortMerge(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -879,20 +859,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { return pBlock; } -void destroyMultiwayMergeOperatorInfo(void* param) { - SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param; - pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); - pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); - pInfo->pIntermediateBlock = blockDataDestroy(pInfo->pIntermediateBlock); - - tsortDestroySortHandle(pInfo->pSortHandle); - taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->matchInfo.pList); - - taosMemoryFreeClear(param); -} - -int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { +int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info; @@ -904,64 +871,3 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai return TSDB_CODE_SUCCESS; } -SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, - SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) { - SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode; - - SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; - - int32_t code = TSDB_CODE_SUCCESS; - if (pInfo == NULL || pOperator == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - - initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); - pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); - - int32_t rowSize = pInfo->binfo.pRes->info.rowSize; - int32_t numOfOutputCols = 0; - code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, - &pInfo->matchInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); - - initResultSizeInfo(&pOperator->resultInfo, 1024); - blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - - pInfo->groupSort = pMergePhyNode->groupSort; - pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId; - pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); - pInfo->pInputBlock = pInputBlock; - size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock); - pInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols); - pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. - pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder; - pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder; - pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId; - - setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, - destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL); - - code = appendDownstream(pOperator, downStreams, numStreams); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - return pOperator; - -_error: - if (pInfo != NULL) { - destroyMultiwayMergeOperatorInfo(pInfo); - } - - pTaskInfo->code = code; - taosMemoryFree(pOperator); - return NULL; -}