diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 72aab9adf0..c7b5858409 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -364,6 +364,11 @@ typedef struct SSortExecInfo { int32_t readBytes; // read io bytes } SSortExecInfo; +typedef struct SNonSortExecInfo { + +} SNonSortExecInfo; + + typedef struct STUidTagInfo { char* name; uint64_t uid; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index dbbe1d92dc..b1f2c4390c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -40,6 +40,13 @@ typedef enum EGroupAction { GROUP_ACTION_CLEAR } EGroupAction; +typedef enum EMergeType { + MERGE_TYPE_SORT = 1, + MERGE_TYPE_NON_SORT, + MERGE_TYPE_COLUMNS, + MERGE_TYPE_MAX_VALUE +} EMergeType; + typedef struct SLogicNode { ENodeType type; bool dynamicOp; @@ -221,6 +228,8 @@ typedef struct SMergeLogicNode { SNodeList* pInputs; int32_t numOfChannels; int32_t srcGroupId; + bool colsMerge; + bool needSort; bool groupSort; bool ignoreGroupId; bool inputWithGroupId; @@ -531,6 +540,7 @@ typedef struct SExchangePhysiNode { typedef struct SMergePhysiNode { SPhysiNode node; + EMergeType type; SNodeList* pMergeKeys; SNodeList* pTargets; int32_t numOfChannels; diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index c704eb3951..1171e386d1 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -37,7 +37,7 @@ extern "C" { #define EXPLAIN_TABLE_COUNT_SCAN_FORMAT "Table Count Row Scan on %s" #define EXPLAIN_PROJECTION_FORMAT "Projection" #define EXPLAIN_JOIN_FORMAT "%s" -#define EXPLAIN_AGG_FORMAT "Aggragate" +#define EXPLAIN_AGG_FORMAT "%s" #define EXPLAIN_INDEF_ROWS_FORMAT "Indefinite Rows Function" #define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1" #define EXPLAIN_SORT_FORMAT "Sort" @@ -85,7 +85,8 @@ extern "C" { #define EXPLAIN_COLUMNS_FORMAT "columns=%d" #define EXPLAIN_PSEUDO_COLUMNS_FORMAT "pseudo_columns=%d" #define EXPLAIN_WIDTH_FORMAT "width=%d" -#define EXPLAIN_TABLE_SCAN_FORMAT "order=[asc|%d desc|%d]" +#define EXPLAIN_SCAN_ORDER_FORMAT "order=[asc|%d desc|%d]" +#define EXPLAIN_SCAN_MODE_FORMAT "mode=%s" #define EXPLAIN_GROUPS_FORMAT "groups=%d" #define EXPLAIN_WIDTH_FORMAT "width=%d" #define EXPLAIN_INTERVAL_VALUE_FORMAT "interval=%" PRId64 "%c" @@ -105,6 +106,7 @@ extern "C" { #define EXPLAIN_UID_SLOT_FORMAT "uid_slot=%d,%d" #define EXPLAIN_SRC_SCAN_FORMAT "src_scan=%d,%d" #define EXPLAIN_PLAN_BLOCKING "blocking=%d" +#define EXPLAIN_MERGE_MODE_FORMAT "mode=%s" #define COMMAND_RESET_LOG "resetLog" #define COMMAND_SCHEDULE_POLICY "schedulePolicy" @@ -157,6 +159,7 @@ typedef struct SExplainCtx { #define EXPLAIN_ORDER_STRING(_order) ((ORDER_ASC == _order) ? "asc" : ORDER_DESC == _order ? "desc" : "unknown") #define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join") +#define EXPLAIN_MERGE_MODE_STRING(_mode) ((_mode) == MERGE_TYPE_SORT ? "sort" : ((_mode) == MERGE_TYPE_NON_SORT ? "merge" : "column")) #define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u))) diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 27cfaab3cf..185e23590a 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -284,10 +284,34 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t return TSDB_CODE_SUCCESS; } -static uint8_t getIntervalPrecision(SIntervalPhysiNode *pIntNode) { +static uint8_t qExplainGetIntervalPrecision(SIntervalPhysiNode *pIntNode) { return ((SColumnNode *)pIntNode->window.pTspk)->node.resType.precision; } +static char* qExplainGetScanMode(STableScanPhysiNode* pScan) { + bool isGroupByTbname = false; + bool isGroupByTag = false; + bool seq = false; + bool groupOrder = false; + if (pScan->pGroupTags && LIST_LENGTH(pScan->pGroupTags) == 1) { + SNode* p = nodesListGetNode(pScan->pGroupTags, 0); + if (QUERY_NODE_FUNCTION == nodeType(p) && (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0)) { + isGroupByTbname = true; + } + } + + isGroupByTag = (NULL != pScan->pGroupTags) && !isGroupByTbname; + if ((((!isGroupByTag) || isGroupByTbname) && pScan->groupSort) || (isGroupByTag && (pScan->groupSort || pScan->scan.groupOrderScan))) { + return "seq_grp_order"; + } + + if ((isGroupByTbname && (pScan->groupSort || pScan->scan.groupOrderScan)) || (isGroupByTag && (pScan->groupSort || pScan->scan.groupOrderScan))) { + return "grp_order"; + } + + return "ts_order"; +} + int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, int32_t level) { int32_t tlen = 0; bool isVerboseLine = false; @@ -360,7 +384,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_TABLE_SCAN_FORMAT, pTblScanNode->scanSeq[0], pTblScanNode->scanSeq[1]); + EXPLAIN_ROW_APPEND(EXPLAIN_SCAN_ORDER_FORMAT, pTblScanNode->scanSeq[0], pTblScanNode->scanSeq[1]); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_SCAN_MODE_FORMAT, qExplainGetScanMode(pTblScanNode)); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); @@ -599,7 +625,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT); + EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "Group" : "Aggragate")); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); @@ -841,7 +867,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND_SLIMIT(pIntNode->window.node.pSlimit); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - uint8_t precision = getIntervalPrecision(pIntNode); + uint8_t precision = qExplainGetIntervalPrecision(pIntNode); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT, INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision), pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision), @@ -893,7 +919,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND_SLIMIT(pIntNode->window.node.pSlimit); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - uint8_t precision = getIntervalPrecision(pIntNode); + uint8_t precision = qExplainGetIntervalPrecision(pIntNode); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT, INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision), pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision), @@ -1119,23 +1145,13 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pMergeNode->node.inputTsOrder)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT, EXPLAIN_ORDER_STRING(pMergeNode->node.outputTsOrder)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_MERGE_MODE_FORMAT, EXPLAIN_MERGE_MODE_STRING(pMergeNode->type)); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); if (EXPLAIN_MODE_ANALYZE == ctx->mode) { - // sort key - EXPLAIN_ROW_NEW(level + 1, "Merge Key: "); - if (pResNode->pExecInfo) { - for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { - SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i); - EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); - } - } - - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - // sort method EXPLAIN_ROW_NEW(level + 1, "Sort Method: "); @@ -1419,7 +1435,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND_SLIMIT(pIntNode->window.node.pSlimit); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - uint8_t precision = getIntervalPrecision(pIntNode); + uint8_t precision = qExplainGetIntervalPrecision(pIntNode); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT, INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision), pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision), diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 2348a3c97b..b4461f20b1 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -239,7 +239,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream = 2; } else { pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0); - pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 1); + pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 0); } int32_t numOfCols = 0; diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 9650ac4cb5..204a9458b8 100755 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -23,52 +23,57 @@ typedef struct SSortMergeInfo { SArray* pSortInfo; SSortHandle* pSortHandle; STupleHandle* prefetchedTuple; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SSDataBlock* pIntermediateBlock; // to hold the intermediate result + SSDataBlock* pInputBlock; + SColMatchInfo matchInfo; } SSortMergeInfo; typedef struct SNonSortMergeInfo { - + int32_t lastSourceIdx; + int32_t sourceWorkIdx; + int32_t sourceNum; + int32_t* pSourceStatus; } SNonSortMergeInfo; -typedef struct SColumnMergeInfo { - -} SColumnMergeInfo; +typedef struct SColsMergeInfo { + uint64_t srcBlkIds[2]; +} SColsMergeInfo; typedef struct SMultiwayMergeOperatorInfo { SOptrBasicInfo binfo; + EMergeType type; union { SSortMergeInfo sortMergeInfo; SNonSortMergeInfo nsortMergeInfo; - SColumnMergeInfo colMergeInfo; + SColsMergeInfo colsMergeInfo; }; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort SLimitInfo limitInfo; - SColMatchInfo matchInfo; - SSDataBlock* pInputBlock; - SSDataBlock* pIntermediateBlock; // to hold the intermediate result - int64_t startTs; // sort start time bool groupMerge; bool ignoreGroupId; uint64_t groupId; bool inputWithGroupId; } SMultiwayMergeOperatorInfo; -int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { +SSDataBlock* sortMergeloadNextDataBlock(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*)param; + SSDataBlock* pBlock = pOperator->fpSet.getNextFn(pOperator); + return pBlock; +} + +int32_t openSortMergeOperator(SOperatorInfo* pOperator) { SMultiwayMergeOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; - if (OPTR_IS_OPENED(pOperator)) { - return TSDB_CODE_SUCCESS; - } + int32_t numOfBufPage = pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize; - pInfo->startTs = taosGetTimestampUs(); - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pSortMergeInfo->pSortHandle = tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize, numOfBufPage, + pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0); - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0); - - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); - tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupMerge); + tsortSetFetchRawDataFp(pSortMergeInfo->pSortHandle, sortMergeloadNextDataBlock, NULL, NULL); + tsortSetCompareGroupId(pSortMergeInfo->pSortHandle, pInfo->groupMerge); for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SOperatorInfo* pDownstream = pOperator->pDownstream[i]; @@ -80,19 +85,280 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { ps->param = pDownstream; ps->onlyRef = true; - tsortAddSource(pInfo->pSortHandle, ps); + tsortAddSource(pSortMergeInfo->pSortHandle, ps); } - int32_t code = tsortOpen(pInfo->pSortHandle); + return tsortOpen(pSortMergeInfo->pSortHandle); +} + +static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, + SSDataBlock* p, bool* newgroup) { + SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; + *newgroup = false; + + while (1) { + STupleHandle* pTupleHandle = NULL; + if (pInfo->groupMerge || pInfo->inputWithGroupId) { + if (pSortMergeInfo->prefetchedTuple == NULL) { + pTupleHandle = tsortNextTuple(pHandle); + } else { + pTupleHandle = pSortMergeInfo->prefetchedTuple; + pSortMergeInfo->prefetchedTuple = NULL; + uint64_t gid = tsortGetGroupId(pTupleHandle); + if (gid != pInfo->groupId) { + *newgroup = true; + pInfo->groupId = gid; + } + } + } else { + pTupleHandle = tsortNextTuple(pHandle); + pInfo->groupId = 0; + } + + if (pTupleHandle == NULL) { + break; + } + + if (pInfo->groupMerge || pInfo->inputWithGroupId) { + uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); + if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + p->info.id.groupId = tupleGroupId; + pInfo->groupId = tupleGroupId; + } else { + if (p->info.rows == 0) { + appendOneRowToDataBlock(p, pTupleHandle); + p->info.id.groupId = pInfo->groupId = tupleGroupId; + } else { + pSortMergeInfo->prefetchedTuple = pTupleHandle; + break; + } + } + } else { + appendOneRowToDataBlock(p, pTupleHandle); + } + + if (p->info.rows >= capacity) { + break; + } + } +} + +SSDataBlock* doSortMerge(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; + SSortHandle* pHandle = pSortMergeInfo->pSortHandle; + SSDataBlock* pDataBlock = pInfo->binfo.pRes; + SArray* pColMatchInfo = pInfo->matchInfo.pList; + int32_t capacity = pOperator->resultInfo.capacity; + + qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo)); + + blockDataCleanup(pDataBlock); + + if (pSortMergeInfo->pIntermediateBlock == NULL) { + pSortMergeInfo->pIntermediateBlock = tsortGetSortedDataBlock(pHandle); + if (pSortMergeInfo->pIntermediateBlock == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + blockDataEnsureCapacity(pSortMergeInfo->pIntermediateBlock, capacity); + } else { + blockDataCleanup(pSortMergeInfo->pIntermediateBlock); + } + + SSDataBlock* p = pSortMergeInfo->pIntermediateBlock; + bool newgroup = false; + + while (1) { + doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup); + if (p->info.rows == 0) { + break; + } + + if (newgroup) { + resetLimitInfoForNextGroup(&pInfo->limitInfo); + } + + applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); + + if (p->info.rows > 0) { + break; + } + } + + if (p->info.rows > 0) { + int32_t numOfCols = taosArrayGetSize(pColMatchInfo); + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); + + SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); + colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); + } + + pDataBlock->info.rows = p->info.rows; + pDataBlock->info.scanFlag = p->info.scanFlag; + if (pInfo->ignoreGroupId) { + pDataBlock->info.id.groupId = 0; + } else { + pDataBlock->info.id.groupId = pInfo->groupId; + } + pDataBlock->info.dataLoad = 1; + } + + qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId, + pDataBlock->info.rows); + + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + + +int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); + + SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info; + SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; + + *pSortExecInfo = tsortGetSortExecInfo(pSortMergeInfo->pSortHandle); + *pOptrExplain = pSortExecInfo; + + *len = sizeof(SSortExecInfo); + return TSDB_CODE_SUCCESS; +} + + +void destroySortMergeOperatorInfo(void* param) { + SSortMergeInfo* pSortMergeInfo = param; + pSortMergeInfo->pInputBlock = blockDataDestroy(pSortMergeInfo->pInputBlock); + pSortMergeInfo->pIntermediateBlock = blockDataDestroy(pSortMergeInfo->pIntermediateBlock); + + tsortDestroySortHandle(pSortMergeInfo->pSortHandle); + taosArrayDestroy(pSortMergeInfo->pSortInfo); +} + +#define NON_SORT_NEXT_SRC(_info, _idx) ((++(_idx) >= (_info)->sourceNum) ? ((_info)->sourceWorkIdx) : (_idx)) + +int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) { + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SNonSortMergeInfo* pNonSortMergeInfo = &pInfo->nsortMergeInfo; + + pNonSortMergeInfo->sourceWorkIdx = 0; + pNonSortMergeInfo->sourceNum = pOperator->numOfDownstream; + pNonSortMergeInfo->lastSourceIdx = -1; + pNonSortMergeInfo->pSourceStatus = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pNonSortMergeInfo->pSourceStatus)); + if (NULL == pNonSortMergeInfo->pSourceStatus) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + pNonSortMergeInfo->pSourceStatus[i] = i; + } + + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo; + SSDataBlock* pBlock = NULL; + + qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo)); + + int32_t idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx); + while (idx < pNonSortMerge->sourceNum) { + pBlock = getNextBlockFromDownstream(pOperator, pNonSortMerge->pSourceStatus[idx]); + if (NULL == pBlock) { + TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]); + pNonSortMerge->sourceWorkIdx++; + idx = NON_SORT_NEXT_SRC(pNonSortMerge, idx); + continue; + } + break; + } + + return pBlock; +} + +void destroyNonSortMergeOperatorInfo(void* param) { + SNonSortMergeInfo* pNonSortMerge = param; + taosMemoryFree(pNonSortMerge->pSourceStatus); +} + +int32_t getNonSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + return TSDB_CODE_SUCCESS; +} + + +int32_t openColsMergeOperator(SOperatorInfo* pOperator) { + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* doColsMerge(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pBlock = NULL; + + qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo)); + + for (int32_t i = 0; i < 2; ++i) { + pBlock = getNextBlockFromDownstream(pOperator, i); + if (NULL == pBlock) { + TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]); + pNonSortMerge->sourceWorkIdx++; + idx = NON_SORT_NEXT_SRC(pNonSortMerge, idx); + continue; + } + break; + } + + return pBlock; +} + +void destroyColsMergeOperatorInfo(void* param) { +} + +int32_t getColsMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + return TSDB_CODE_SUCCESS; +} + + +SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = { + {0}, + {._openFn = openSortMergeOperator, .getNextFn = doSortMerge, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo}, + {._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo}, + {._openFn = openColsMergeOperator, .getNextFn = doColsMerge, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo}, +}; + + +int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { + int32_t code = 0; + SMultiwayMergeOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + int64_t startTs = taosGetTimestampUs(); + + if (NULL != gMultiwayMergeFps[pInfo->type]._openFn) { + code = (*gMultiwayMergeFps[pInfo->type]._openFn)(pOperator); + } + + pOperator->cost.openCost = (taosGetTimestampUs() - startTs) / 1000.0; + pOperator->status = OP_RES_TO_RETURN; + if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); } - pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0; - pOperator->status = OP_RES_TO_RETURN; - OPTR_SET_OPENED(pOperator); - return TSDB_CODE_SUCCESS; + return code; } SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { @@ -100,7 +366,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSDataBlock* pBlock = NULL; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SMultiwayMergeOperatorInfo* pInfo = pOperator->info; int32_t code = pOperator->fpSet._openFn(pOperator); @@ -108,8 +375,9 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo)); - SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator); + if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) { + pBlock = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator); + } if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; } else { @@ -122,26 +390,24 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { void destroyMultiwayMergeOperatorInfo(void* param) { SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); - pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); - pInfo->pIntermediateBlock = blockDataDestroy(pInfo->pIntermediateBlock); - - tsortDestroySortHandle(pInfo->pSortHandle); - taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->matchInfo.pList); + if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) { + (*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo); + } + taosMemoryFreeClear(param); } int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { - SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); - + int32_t code = 0; SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info; - *pSortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); - *pOptrExplain = pSortExecInfo; + if (NULL != gMultiwayMergeFps[pInfo->type].getExplainFn) { + code = (*gMultiwayMergeFps[pInfo->type].getExplainFn)(pOptr, pOptrExplain, len); + } - *len = sizeof(SSortExecInfo); - return TSDB_CODE_SUCCESS; + return code; } SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, @@ -158,34 +424,59 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size goto _error; } - initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); - pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); - - int32_t rowSize = pInfo->binfo.pRes->info.rowSize; - int32_t numOfOutputCols = 0; - code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, - &pInfo->matchInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); - - initResultSizeInfo(&pOperator->resultInfo, 1024); - blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - pInfo->groupMerge = pMergePhyNode->groupSort; pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId; - pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); - pInfo->pInputBlock = pInputBlock; - size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock); - pInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols); - pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder; pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId; + pInfo->type = pMergePhyNode->type; + switch (pInfo->type) { + case MERGE_TYPE_SORT: { + SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; + initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); + pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); + + initResultSizeInfo(&pOperator->resultInfo, 1024); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + + size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock); + int32_t rowSize = pInfo->binfo.pRes->info.rowSize; + int32_t numOfOutputCols = 0; + pSortMergeInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); + pSortMergeInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols); + pSortMergeInfo->sortBufSize = pSortMergeInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. + pSortMergeInfo->pInputBlock = pInputBlock; + code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, + &pSortMergeInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + break; + } + case MERGE_TYPE_NON_SORT: { + SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo; + break; + } + case MERGE_TYPE_COLUMNS: { + SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo; + pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); + initResultSizeInfo(&pOperator->resultInfo, 1); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + + pColsMerge->srcBlkIds[0] = getOperatorResultBlockId(downStreams[0], 0); + pColsMerge->srcBlkIds[1] = getOperatorResultBlockId(downStreams[1], 0); + break; + } + default: + qError("Invalid merge type: %d", pInfo->type); + code = TSDB_CODE_INVALID_PARA; + goto _error; + } + setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 0ccdb2dd2b..507dbe7ee2 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -675,199 +675,5 @@ _error: return NULL; } -int32_t openSortMergeOperator(SOperatorInfo* pOperator) { - SMultiwayMergeOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - if (OPTR_IS_OPENED(pOperator)) { - return TSDB_CODE_SUCCESS; - } - - pInfo->startTs = taosGetTimestampUs(); - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0); - - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); - tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupMerge); - - for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - SOperatorInfo* pDownstream = pOperator->pDownstream[i]; - if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { - pDownstream->fpSet._openFn(pDownstream); - } - - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - ps->param = pDownstream; - ps->onlyRef = true; - - tsortAddSource(pInfo->pSortHandle, ps); - } - - int32_t code = tsortOpen(pInfo->pSortHandle); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, terrno); - } - - pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0; - pOperator->status = OP_RES_TO_RETURN; - - OPTR_SET_OPENED(pOperator); - return TSDB_CODE_SUCCESS; -} - -static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity, - SSDataBlock* p, bool* newgroup) { - *newgroup = false; - - while (1) { - STupleHandle* pTupleHandle = NULL; - if (pInfo->groupMerge || pInfo->inputWithGroupId) { - if (pInfo->prefetchedTuple == NULL) { - pTupleHandle = tsortNextTuple(pHandle); - } else { - pTupleHandle = pInfo->prefetchedTuple; - pInfo->prefetchedTuple = NULL; - uint64_t gid = tsortGetGroupId(pTupleHandle); - if (gid != pInfo->groupId) { - *newgroup = true; - pInfo->groupId = gid; - } - } - } else { - pTupleHandle = tsortNextTuple(pHandle); - pInfo->groupId = 0; - } - - if (pTupleHandle == NULL) { - break; - } - - if (pInfo->groupMerge || pInfo->inputWithGroupId) { - uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); - if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) { - appendOneRowToDataBlock(p, pTupleHandle); - p->info.id.groupId = tupleGroupId; - pInfo->groupId = tupleGroupId; - } else { - if (p->info.rows == 0) { - appendOneRowToDataBlock(p, pTupleHandle); - p->info.id.groupId = pInfo->groupId = tupleGroupId; - } else { - pInfo->prefetchedTuple = pTupleHandle; - break; - } - } - } else { - appendOneRowToDataBlock(p, pTupleHandle); - } - - if (p->info.rows >= capacity) { - break; - } - } -} - -SSDataBlock* getSortMergeSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo, - SOperatorInfo* pOperator) { - SMultiwayMergeOperatorInfo* pInfo = pOperator->info; - - int32_t capacity = pOperator->resultInfo.capacity; - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - blockDataCleanup(pDataBlock); - - if (pInfo->pIntermediateBlock == NULL) { - pInfo->pIntermediateBlock = tsortGetSortedDataBlock(pHandle); - if (pInfo->pIntermediateBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - blockDataEnsureCapacity(pInfo->pIntermediateBlock, capacity); - } else { - blockDataCleanup(pInfo->pIntermediateBlock); - } - - SSDataBlock* p = pInfo->pIntermediateBlock; - bool newgroup = false; - - while (1) { - doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup); - if (p->info.rows == 0) { - break; - } - - if (newgroup) { - resetLimitInfoForNextGroup(&pInfo->limitInfo); - } - - applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); - - if (p->info.rows > 0) { - break; - } - } - - if (p->info.rows > 0) { - int32_t numOfCols = taosArrayGetSize(pColMatchInfo); - for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); - - SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); - SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); - colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); - } - - pDataBlock->info.rows = p->info.rows; - pDataBlock->info.scanFlag = p->info.scanFlag; - if (pInfo->ignoreGroupId) { - pDataBlock->info.id.groupId = 0; - } else { - pDataBlock->info.id.groupId = pInfo->groupId; - } - pDataBlock->info.dataLoad = 1; - } - - qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId, - pDataBlock->info.rows); - - return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; -} - -SSDataBlock* doSortMerge(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SMultiwayMergeOperatorInfo* pInfo = pOperator->info; - - int32_t code = pOperator->fpSet._openFn(pOperator); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - - qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo)); - SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator); - if (pBlock != NULL) { - pOperator->resultInfo.totalRows += pBlock->info.rows; - } else { - setOperatorCompleted(pOperator); - } - - return pBlock; -} - -int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { - SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); - - SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info; - - *pSortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); - *pOptrExplain = pSortExecInfo; - - *len = sizeof(SSortExecInfo); - return TSDB_CODE_SUCCESS; -} diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index bd73b02c80..91f40da00e 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -488,6 +488,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst CLONE_NODE_LIST_FIELD(pInputs); COPY_SCALAR_FIELD(numOfChannels); COPY_SCALAR_FIELD(srcGroupId); + COPY_SCALAR_FIELD(needSort); COPY_SCALAR_FIELD(groupSort); COPY_SCALAR_FIELD(ignoreGroupId); COPY_SCALAR_FIELD(inputWithGroupId); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c2acf0dbdf..cf7ade1b64 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2272,6 +2272,7 @@ static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId"; static const char* jkMergePhysiPlanGroupSort = "GroupSort"; static const char* jkMergePhysiPlanIgnoreGroupID = "IgnoreGroupID"; static const char* jkMergePhysiPlanInputWithGroupId = "InputWithGroupId"; +static const char* jkMergePhysiPlanType = "Type"; static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj; @@ -2298,6 +2299,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanInputWithGroupId, pNode->inputWithGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanType, pNode->type); + } return code; } @@ -2324,6 +2328,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkMergePhysiPlanIgnoreGroupID, &pNode->ignoreGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkMergePhysiPlanType, (int32_t*)&pNode->type); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 99100b2a1d..28a7edd541 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2683,6 +2683,7 @@ enum { PHY_MERGE_CODE_GROUP_SORT, PHY_MERGE_CODE_IGNORE_GROUP_ID, PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, + PHY_MERGE_CODE_TYPE, }; static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2710,6 +2711,9 @@ static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, pNode->inputWithGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_MERGE_CODE_TYPE, pNode->type); + } return code; } @@ -2745,6 +2749,9 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { case PHY_MERGE_CODE_INPUT_WITH_GROUP_ID: code = tlvDecodeBool(pTlv, &pNode->inputWithGroupId); break; + case PHY_MERGE_CODE_TYPE: + code = tlvDecodeI32(pTlv, (int32_t*)&pNode->type); + break; default: break; } diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index 83a4e9ced8..e2a4ded5a9 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -43,6 +43,7 @@ int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan); int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList); +int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan); bool getBatchScanOptionFromHint(SNodeList* pList); bool getSortForGroupOptHint(SNodeList* pList); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index d6799a25a7..0e80f5bcec 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1950,41 +1950,57 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) { return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange); } -static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) { +static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) { + int32_t code = TSDB_CODE_SUCCESS; SMergePhysiNode* pMerge = (SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; } + if (pMergeLogicNode->colsMerge) { + pMerge->type = MERGE_TYPE_COLUMNS; + } else if (pMergeLogicNode->needSort) { + pMerge->type = MERGE_TYPE_SORT; + } else { + pMerge->type = MERGE_TYPE_NON_SORT; + } + pMerge->numOfChannels = pMergeLogicNode->numOfChannels; pMerge->srcGroupId = pMergeLogicNode->srcGroupId; pMerge->groupSort = pMergeLogicNode->groupSort; pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId; pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId; - int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc); + if (!pMergeLogicNode->colsMerge) { + code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc); - if (TSDB_CODE_SUCCESS == code) { - for (int32_t i = 0; i < pMerge->numOfChannels; ++i) { - code = createExchangePhysiNodeByMerge(pMerge); - if (TSDB_CODE_SUCCESS != code) { - break; + if (TSDB_CODE_SUCCESS == code) { + for (int32_t i = 0; i < pMerge->numOfChannels; ++i) { + code = createExchangePhysiNodeByMerge(pMerge); + if (TSDB_CODE_SUCCESS != code) { + break; + } } } - } - if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) { - code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys, - &pMerge->pMergeKeys); - } + if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) { + code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys, + &pMerge->pMergeKeys); + } - if (TSDB_CODE_SUCCESS == code) { - code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets, - &pMerge->pTargets); - } - if (TSDB_CODE_SUCCESS == code) { - code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc); + if (TSDB_CODE_SUCCESS == code) { + code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets, + &pMerge->pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc); + } + } else { + SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; + SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc; + + code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pMergeLogicNode->node.pTargets, &pMerge->pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -2022,7 +2038,7 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_MERGE: - return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode); + return createMergePhysiNode(pCxt, pChildren, (SMergeLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE: return createGroupCachePhysiNode(pCxt, pChildren, (SGroupCacheLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL: diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index d7b3f51961..bf5fe901a6 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -248,8 +248,6 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { } if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)) { return true; - } else if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { - return stbSplHasMultiTbScan(streamQuery, (SLogicNode*)pChild); } return false; } @@ -540,11 +538,12 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p } static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode, - SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) { + SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort, bool needSort) { SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; } + pMerge->needSort = needSort; pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild); pMerge->srcGroupId = pCxt->groupId; pMerge->node.precision = pPartChild->precision; @@ -621,7 +620,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, ((SWindowLogicNode*)pInfo->pSplitNode)->node.outputTsOrder, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true); + code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true, true); } if (TSDB_CODE_SUCCESS != code) { nodesDestroyList(pMergeKeys); @@ -712,7 +711,7 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl ((SWindowLogicNode*)pWindow)->node.inputTsOrder, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true); + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true, true); } if (TSDB_CODE_SUCCESS == code) { @@ -982,7 +981,7 @@ static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* p } } } - code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort); + code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort, true); if (TSDB_CODE_SUCCESS == code && sortForGroup) { SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1); @@ -1145,7 +1144,7 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) bool groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort; int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort); + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort, true); } if (TSDB_CODE_SUCCESS == code) { nodesDestroyNode((SNode*)pInfo->pSplitNode); @@ -1195,7 +1194,7 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit SLogicNode* pSplitNode = NULL; int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true); + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true, pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP); } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, @@ -1269,7 +1268,7 @@ static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSub ((SLimitNode*)pMergeScan->pLimit)->limit += ((SLimitNode*)pMergeScan->pLimit)->offset; ((SLimitNode*)pMergeScan->pLimit)->offset = 0; } - code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort); + code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort, true); } if (TSDB_CODE_SUCCESS == code) { nodesDestroyNode((SNode*)pScan); @@ -1340,12 +1339,14 @@ static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeLis static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = TSDB_CODE_SUCCESS; + bool needSort = false; SNodeList* pMergeKeys = NULL; if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) { + needSort = true; code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys); } if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true); + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true, needSort); } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, diff --git a/source/libs/planner/src/planValidator.c b/source/libs/planner/src/planValidator.c new file mode 100755 index 0000000000..7461ee4f9a --- /dev/null +++ b/source/libs/planner/src/planValidator.c @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "planInt.h" + +#include "catalog.h" +#include "functionMgt.h" +#include "systable.h" +#include "tglobal.h" + +typedef struct SValidatePlanContext { + SPlanContext* pPlanCxt; + int32_t errCode; +} SValidatePlanContext; + +int32_t doValidatePhysiNode(SValidatePlanContext* pCxt, SNode* pNode); + +int32_t validateMergePhysiNode(SValidatePlanContext* pCxt, SMergePhysiNode* pMerge) { + if ((NULL != pMerge->node.pLimit || NULL != pMerge->node.pSlimit) && pMerge->type == MERGE_TYPE_NON_SORT) { + planError("no limit&slimit supported for non sort merge"); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t validateSubplanNode(SValidatePlanContext* pCxt, SSubplan* pSubPlan) { + if (SUBPLAN_TYPE_MODIFY == pSubPlan->subplanType) { + return TSDB_CODE_SUCCESS; + } + return doValidatePhysiNode(pCxt, (SNode*)pSubPlan->pNode); +} + +int32_t validateQueryPlanNode(SValidatePlanContext* pCxt, SQueryPlan* pPlan) { + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + FOREACH(pNode, pPlan->pSubplans) { + if (QUERY_NODE_NODE_LIST != nodeType(pNode)) { + code = TSDB_CODE_PLAN_INTERNAL_ERROR; + break; + } + + SNode* pSubNode = NULL; + SNodeListNode* pSubplans = (SNodeListNode*)pNode; + FOREACH(pSubNode, pSubplans->pNodeList) { + if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(pNode)) { + code = TSDB_CODE_PLAN_INTERNAL_ERROR; + break; + } + + code = doValidatePhysiNode(pCxt, pSubNode); + if (code) { + break; + } + } + } + + return code; +} + +int32_t doValidatePhysiNode(SValidatePlanContext* pCxt, SNode* pNode) { + switch (nodeType(pNode)) { + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: + case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE: + return validateMergePhysiNode(pCxt, (SMergePhysiNode*)pNode); + case QUERY_NODE_PHYSICAL_PLAN_SORT: + case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: + case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_FILL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: + case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: + case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + case QUERY_NODE_PHYSICAL_PLAN_INSERT: + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + case QUERY_NODE_PHYSICAL_PLAN_DELETE: + case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: + break; + case QUERY_NODE_PHYSICAL_SUBPLAN: + return validateSubplanNode(pCxt, (SSubplan*)pNode); + case QUERY_NODE_PHYSICAL_PLAN: + return validateQueryPlanNode(pCxt, (SQueryPlan *)pNode); + default: + break; + } + + return TSDB_CODE_SUCCESS; +} + +static void destoryValidatePlanContext(SValidatePlanContext* pCxt) { + +} + +int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan) { + SValidatePlanContext cxt = {.pPlanCxt = pCxt, + .errCode = TSDB_CODE_SUCCESS + }; + + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + FOREACH(pNode, pPlan->pSubplans) { + if (QUERY_NODE_NODE_LIST != nodeType(pNode)) { + code = TSDB_CODE_PLAN_INTERNAL_ERROR; + break; + } + + SNode* pSubNode = NULL; + SNodeListNode* pSubplans = (SNodeListNode*)pNode; + FOREACH(pSubNode, pSubplans->pNodeList) { + code = doValidatePhysiNode(&cxt, pSubNode); + if (code) { + break; + } + } + } + + destoryValidatePlanContext(&cxt); + return code; +} diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 6dd9c544cc..a4a33b30fd 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -57,6 +57,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo if (TSDB_CODE_SUCCESS == code) { code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList); } + if (TSDB_CODE_SUCCESS == code) { + code = validateQueryPlan(pCxt, *pPlan); + } if (TSDB_CODE_SUCCESS == code) { dumpQueryPlan(*pPlan); }