diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 160756d981..46d1c51c64 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1216,7 +1216,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; - pBlock->info.rowSize = pDataBlock->info.rows; + pBlock->info.rowSize = pDataBlock->info.rowSize; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 100e35bc3c..a3755a174e 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -47,6 +47,9 @@ extern "C" { #define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c" #define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64 #define EXPLAIN_RATIO_TIME_FORMAT "Ratio: %f" +#define EXPLAIN_MERGE_FORMAT "Merge" +#define EXPLAIN_MERGE_KEYS_FORMAT "Merge Key: " + #define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms" #define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms" diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 99ff4d406c..347fb326bb 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -173,6 +173,11 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo pPhysiChildren = partitionPhysiNode->node.pChildren; break; } + case QUERY_NODE_PHYSICAL_PLAN_MERGE: { + SMergePhysiNode *mergePhysiNode = (SMergePhysiNode *)pNode; + pPhysiChildren = mergePhysiNode->node.pChildren; + break; + } default: qError("not supported physical node type %d", pNode->type); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -857,6 +862,50 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } break; } + case QUERY_NODE_PHYSICAL_PLAN_MERGE: { + SMergePhysiNode *pMergeNode = (SMergePhysiNode *)pNode; + EXPLAIN_ROW_NEW(level, EXPLAIN_MERGE_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + } + + SDataBlockDescNode *pDescNode = pMergeNode->node.pOutputDataBlockDesc; + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pDescNode->pSlots)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDescNode->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, + nodesGetOutputNumFromSlotList(pMergeNode->node.pOutputDataBlockDesc->pSlots)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pMergeNode->node.pOutputDataBlockDesc->outputRowSize); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT); + for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) { + SOrderByExprNode *ptn = nodesListGetNode(pMergeNode->pMergeKeys, i); + EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr)); + } + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (pMergeNode->node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pMergeNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + } + break; + } default: qError("not supported physical node type %d", pNode->type); return TSDB_CODE_QRY_APP_ERROR; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9b8b9dca26..3dcf2a7c0c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -787,7 +787,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, +SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index c8b1b3ee51..fd3581e2bf 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -130,6 +130,12 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId); */ void* tsortGetValue(STupleHandle* pVHandle, int32_t colId); +/** + * + * @param pVHandle + * @return + */ +uint64_t tsortGetGroupId(STupleHandle* pVHandle); /** * * @param pSortHandle diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 464fb5c957..ea54ae472d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4666,8 +4666,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t numOfOutputCols = 0; SArray* pColList = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID); - - pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo); + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); + SSDataBlock* pInputDataBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); + pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pInputDataBlock, pResBlock, sortInfo, pColList, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index c84d4491af..95f9514b07 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -84,7 +84,6 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo) { blockDataCleanup(pDataBlock); - ASSERT(taosArrayGetSize(pColMatchInfo) == pDataBlock->info.numOfCols); SSDataBlock* p = tsortGetSortedDataBlock(pHandle); if (p == NULL) { @@ -230,7 +229,12 @@ typedef struct SMultiwaySortMergeOperatorInfo { SSortHandle* pSortHandle; SArray* pColMatchInfo; // for index map from table scan output + SSDataBlock* pInputBlock; int64_t startTs; // sort start time + + bool hasGroupId; + uint64_t groupId; + STupleHandle *prefetchedTuple; } SMultiwaySortMergeOperatorInfo; int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { @@ -246,14 +250,14 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE, - pInfo->bufPageSize, numOfBufPage, NULL, pTaskInfo->id.str); + pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - SSortSource ps = {0}; - ps.param = pOperator->pDownstream[i]; - tsortAddSource(pInfo->pSortHandle, &ps); + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + ps->param = pOperator->pDownstream[i]; + tsortAddSource(pInfo->pSortHandle, ps); } int32_t code = tsortOpen(pInfo->pSortHandle); @@ -269,6 +273,70 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, + SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) { + blockDataCleanup(pDataBlock); + + SSDataBlock* p = tsortGetSortedDataBlock(pHandle); + if (p == NULL) { + return NULL; + } + + blockDataEnsureCapacity(p, capacity); + + while (1) { + + STupleHandle* pTupleHandle = NULL; + if (pInfo->prefetchedTuple == NULL) { + pTupleHandle = tsortNextTuple(pHandle); + } else { + pTupleHandle = pInfo->prefetchedTuple; + pInfo->prefetchedTuple = NULL; + } + + if (pTupleHandle == NULL) { + break; + } + + uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); + if (!pInfo->hasGroupId) { + pInfo->groupId = tupleGroupId; + pInfo->hasGroupId = true; + appendOneRowToDataBlock(p, pTupleHandle); + } else if (pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + } else { + pInfo->prefetchedTuple = pTupleHandle; + pInfo->groupId = tupleGroupId; + break; + } + + if (p->info.rows >= capacity) { + break; + } + + } + + if (p->info.rows > 0) { + int32_t numOfCols = taosArrayGetSize(pColMatchInfo); + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); + ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + + SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + colDataAssign(pDst, pSrc, p->info.rows); + } + + pDataBlock->info.rows = p->info.rows; + pDataBlock->info.capacity = p->info.rows; + } + + blockDataDestroy(p); + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + + SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -283,7 +351,11 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = - getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo); + getMultiwaySortedBlockData(pInfo->pSortHandle, + pInfo->binfo.pRes, + pOperator->resultInfo.capacity, + pInfo->pColMatchInfo, + pInfo); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -296,6 +368,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) { SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pColMatchInfo); @@ -313,7 +386,7 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx return TSDB_CODE_SUCCESS; } -SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, +SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) { SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo)); @@ -330,6 +403,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; + pInfo->pInputBlock = pInputBlock; pOperator->name = "MultiwaySortMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->blocking = true; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 7581836d59..846da4a32a 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -225,6 +225,10 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; pSource->src.pBlock = pHandle->fetchfp(pSource->param); + if (pSource->src.pBlock == NULL) { + pSource->src.rowIndex = -1; + ++pHandle->numOfCompletedSources; + } } } @@ -361,13 +365,21 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { bool leftNull = false; if (pLeftColInfoData->hasNull) { - leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]); + if (pLeftBlock->pBlockAgg == NULL) { + leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex); + } else { + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]); + } } SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); bool rightNull = false; if (pRightColInfoData->hasNull) { - rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[pOrder->slotId]); + if (pLeftBlock->pBlockAgg == NULL) { + rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex); + } else { + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[i]); + } } if (leftNull && rightNull) { @@ -408,7 +420,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs; qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64, - pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed); + pHandle->idStr, (int32_t) (sortPass + 1), pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0, pHandle->sortElapsed, pHandle->totalElapsed); int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); @@ -697,6 +709,10 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) { return colDataGetData(pColInfo, pVHandle->rowIndex); } +uint64_t tsortGetGroupId(STupleHandle* pVHandle) { + return pVHandle->pBlock->info.groupId; +} + SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { SSortExecInfo info = {0}; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e5c81eda4b..a7f53bb752 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -166,8 +166,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { } return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); } - // case QUERY_NODE_LOGIC_PLAN_SORT: - // return stbSplHasMultiTbScan(streamQuery, pNode); + case QUERY_NODE_LOGIC_PLAN_SORT: + return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_SCAN: return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); default: diff --git a/tests/script/tsim/query/scalarNull.sim b/tests/script/tsim/query/scalarNull.sim index b08ac1d3d9..07bd5e57cd 100644 --- a/tests/script/tsim/query/scalarNull.sim +++ b/tests/script/tsim/query/scalarNull.sim @@ -66,6 +66,7 @@ if $rows != 0 then return -1 endi sql select * from tb1 where null; +print $rows if $rows != 0 then return -1 endi