From 616539a70083f495140070c6c09cd8ae2fa6970e Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 23 Jul 2022 17:30:00 +0800 Subject: [PATCH 1/9] feat: super table order by primary key optimization --- source/libs/planner/src/planOptimizer.c | 24 +++++++++---------- source/libs/planner/src/planPhysiCreater.c | 1 - source/libs/planner/src/planSpliter.c | 14 ++++++++++- source/libs/planner/src/planUtil.c | 3 ++- source/libs/planner/test/planOptimizeTest.cpp | 2 ++ 5 files changed, 29 insertions(+), 15 deletions(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index b006ac2b0a..f107b2343c 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -997,10 +997,7 @@ static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimi switch (nodeType(pNode)) { case QUERY_NODE_LOGIC_PLAN_SCAN: - if (TSDB_SUPER_TABLE != ((SScanLogicNode*)pNode)->tableType) { - return nodesListMakeAppend(pScanNodes, (SNode*)pNode); - } - break; + return nodesListMakeAppend(pScanNodes, (SNode*)pNode); case QUERY_NODE_LOGIC_PLAN_JOIN: code = sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes); @@ -1040,13 +1037,16 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) { static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort, SNodeList* pScanNodes) { EOrder order = sortPriKeyOptGetPriKeyOrder(pSort); - if (ORDER_DESC == order) { - SNode* pScanNode = NULL; - FOREACH(pScanNode, pScanNodes) { - SScanLogicNode* pScan = (SScanLogicNode*)pScanNode; - if (pScan->scanSeq[0] > 0) { - TSWAP(pScan->scanSeq[0], pScan->scanSeq[1]); - } + SNode* pScanNode = NULL; + FOREACH(pScanNode, pScanNodes) { + SScanLogicNode* pScan = (SScanLogicNode*)pScanNode; + if (ORDER_DESC == order && pScan->scanSeq[0] > 0) { + TSWAP(pScan->scanSeq[0], pScan->scanSeq[1]); + } + if (TSDB_SUPER_TABLE == pScan->tableType) { + pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; + pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; } } @@ -2191,7 +2191,7 @@ static bool tagScanMayBeOptimized(SLogicNode* pNode) { !planOptNodeListHasTbname(pAgg->pGroupKeys)) { return false; } - + SNode* pGroupKey = NULL; FOREACH(pGroupKey, pAgg->pGroupKeys) { SNode* pGroup = NULL; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0a1f8bbd0b..fd359fce45 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -415,7 +415,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) { int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols); if (TSDB_CODE_SUCCESS == code) { - // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc); } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 8586234b7e..10604fac19 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -913,13 +913,25 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit } static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) { + bool find = false; SNode* pCol = NULL; FOREACH(pCol, pScan->pScanCols) { if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) { + find = true; + break; + } + } + if (!find) { + return NULL; + } + SNode* pTarget = NULL; + FOREACH(pTarget, pScan->node.pTargets) { + if (nodesEqualNode(pTarget, pCol)) { return pCol; } } - return NULL; + nodesListStrictAppend(pScan->node.pTargets, nodesCloneNode(pCol)); + return pCol; } static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan, diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index bfa6079cb1..7aab8a7ca3 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -124,7 +124,8 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* } static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) { - if (SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) { + if ((SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) || + DATA_ORDER_LEVEL_GLOBAL == pScan->node.requireDataOrder) { return TSDB_CODE_SUCCESS; } // The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 770ac94e5b..058705403b 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -53,6 +53,8 @@ TEST_F(PlanOptimizeTest, sortPrimaryKey) { run("SELECT c1 FROM t1 ORDER BY ts"); + run("SELECT c1 FROM st1 ORDER BY ts"); + run("SELECT c1 FROM t1 ORDER BY ts DESC"); run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTART DESC"); From dbd71d247c627520cefc3bf1d914ec90195eb0d6 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 23 Jul 2022 20:46:39 +0800 Subject: [PATCH 2/9] feat: super table order by primary key optimization --- source/libs/planner/src/planSpliter.c | 40 +++++++++++++++++++--- source/libs/planner/test/planBasicTest.cpp | 7 ++-- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 10604fac19..879c04743d 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -934,18 +934,48 @@ static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) { return pCol; } +static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan, + SNodeList** pOutputMergeKeys) { + SNodeList* pChildren = pScan->node.pChildren; + pScan->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SScanLogicNode* pMergeScan = (SScanLogicNode*)nodesCloneNode((SNode*)pScan); + if (NULL == pMergeScan) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + SNodeList* pMergeKeys = NULL; + if (TSDB_CODE_SUCCESS == code) { + pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE; + pMergeScan->node.pChildren = pChildren; + splSetParent((SLogicNode*)pMergeScan); + code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan), &pMergeKeys); + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutputMergeScan = (SLogicNode*)pMergeScan; + *pOutputMergeKeys = pMergeKeys; + } else { + nodesDestroyNode((SNode*)pMergeScan); + nodesDestroyList(pMergeKeys); + } + + return code; +} + static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan, bool groupSort) { - SNodeList* pMergeKeys = NULL; - int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys); + SLogicNode* pMergeScan = NULL; + SNodeList* pMergeKeys = NULL; + int32_t code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, groupSort); + code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort); } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pSubplan->pChildren, - (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pScan, SPLIT_FLAG_STABLE_SPLIT)); + (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT)); } - pScan->scanType = SCAN_TYPE_TABLE_MERGE; ++(pCxt->groupId); return code; } diff --git a/source/libs/planner/test/planBasicTest.cpp b/source/libs/planner/test/planBasicTest.cpp index 8f9cd94c19..9cfae68d34 100644 --- a/source/libs/planner/test/planBasicTest.cpp +++ b/source/libs/planner/test/planBasicTest.cpp @@ -24,9 +24,10 @@ TEST_F(PlanBasicTest, selectClause) { useDb("root", "test"); run("SELECT * FROM t1"); - run("SELECT 1 FROM t1"); - run("SELECT * FROM st1"); - run("SELECT 1 FROM st1"); + + run("SELECT MAX(c1) c2, c2 FROM t1"); + + run("SELECT MAX(c1) c2, c2 FROM st1"); } TEST_F(PlanBasicTest, whereClause) { From 015193e81609fae5b75699f0dd9730644ac7ad89 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 25 Jul 2022 15:17:53 +0800 Subject: [PATCH 3/9] feat: super table order by primary key optimization --- include/libs/nodes/plannodes.h | 9 ++-- source/libs/command/src/explain.c | 15 +++--- source/libs/executor/inc/executorimpl.h | 45 +++++++++++++++++- source/libs/executor/src/executorimpl.c | 22 +++++---- source/libs/executor/src/joinoperator.c | 26 +++++----- source/libs/executor/src/scanoperator.c | 55 ++++------------------ source/libs/nodes/src/nodesCodeFuncs.c | 4 +- source/libs/nodes/src/nodesTraverseFuncs.c | 2 +- source/libs/nodes/src/nodesUtilFuncs.c | 4 +- source/libs/parser/src/parInsert.c | 14 +++--- source/libs/parser/src/parUtil.c | 2 +- source/libs/planner/src/planOptimizer.c | 35 ++++++++++---- source/libs/planner/src/planPhysiCreater.c | 4 +- source/libs/planner/src/planSpliter.c | 19 ++++---- source/util/src/terror.c | 2 +- 15 files changed, 147 insertions(+), 111 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e382fa4efd..ba16acf7b0 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -104,6 +104,7 @@ typedef struct SJoinLogicNode { SNode* pMergeCondition; SNode* pOnConditions; bool isSingleTableJoin; + EOrder inputTsOrder; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -201,6 +202,7 @@ typedef struct SWindowLogicNode { int64_t watermark; int8_t igExpired; EWindowAlgorithm windowAlgo; + EOrder inputTsOrder; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode { SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; -typedef struct SJoinPhysiNode { +typedef struct SSortMergeJoinPhysiNode { SPhysiNode node; EJoinType joinType; SNode* pMergeCondition; SNode* pOnConditions; SNodeList* pTargets; -} SJoinPhysiNode; - -typedef SJoinPhysiNode SSortMergeJoinPhysiNode; + EOrder inputTsOrder; +} SSortMergeJoinPhysiNode; typedef struct SAggPhysiNode { SPhysiNode node; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 9ffdfc2289..266f96b41e 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -135,7 +135,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; + SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode; pPhysiChildren = pJoinNode->node.pChildren; break; } @@ -434,7 +434,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, - QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT : EXPLAIN_TBL_SCAN_FORMAT, + QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT + : EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { @@ -551,7 +552,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); if (pSTblScanNode->scan.pScanPseudoCols) { EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pSTblScanNode->scan.pScanPseudoCols->length); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); @@ -613,7 +614,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; + SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType)); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { @@ -1180,7 +1181,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); if (pDistScanNode->pScanPseudoCols) { EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pDistScanNode->pScanPseudoCols->length); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDistScanNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); @@ -1367,7 +1368,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pInterpNode->pFuncs->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } - + EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pInterpNode->fillMode)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); @@ -1419,7 +1420,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } } break; - } + } default: qError("not supported physical node type %d", pNode->type); return TSDB_CODE_QRY_APP_ERROR; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1ad17bbc76..44394bb2f0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -321,6 +321,49 @@ typedef struct STableScanInfo { int8_t noTable; } STableScanInfo; +typedef struct STableMergeScanInfo { + STableListInfo* tableListInfo; + int32_t tableStartIndex; + int32_t tableEndIndex; + bool hasGroupId; + uint64_t groupId; + SArray* dataReaders; // array of tsdbReaderT* + SReadHandle readHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; + + SFileBlockLoadRecorder readRecorder; + int64_t numOfRows; + SScanInfo scanInfo; + int32_t scanTimes; + SNode* pFilterNode; // filter info, which is push down by optimizer + SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context + SResultRowInfo* pResultRowInfo; + int32_t* rowEntryInfoOffset; + SExprInfo* pExpr; + SSDataBlock* pResBlock; + SArray* pColMatchInfo; + int32_t numOfOutput; + + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SqlFunctionCtx* pPseudoCtx; + + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + // if the upstream is an interval operator, the interval info is also kept here to get the time + // window to check if current data block needs to be loaded. + SInterval interval; + SSampleExecInfo sample; // sample execution info +} STableMergeScanInfo; + typedef struct STagScanInfo { SColumnInfo *pCols; SSDataBlock *pRes; @@ -881,7 +924,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7bac828a53..7194b16a78 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1356,7 +1356,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); if (pColMatchInfo != NULL) { - for(int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i); if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId); @@ -2885,11 +2885,16 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; - } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { + } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = pOperator->info; *order = pTableScanInfo->cond.order; *scanFlag = pTableScanInfo->scanFlag; return TSDB_CODE_SUCCESS; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { + STableMergeScanInfo* pTableScanInfo = pOperator->info; + *order = pTableScanInfo->cond.order; + *scanFlag = pTableScanInfo->scanFlag; + return TSDB_CODE_SUCCESS; } else { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { return TSDB_CODE_INVALID_PARA; @@ -3307,9 +3312,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } SOperatorInfo* downstream = pOperator->pDownstream[0]; - SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo; + SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo; - while(1) { + while (1) { while (1) { blockDataCleanup(pRes); @@ -3326,7 +3331,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } if (pLimitInfo->remainGroupOffset > 0) { - if (pLimitInfo->currentGroupId == 0 || pLimitInfo->currentGroupId == pBlock->info.groupId) { // it is the first group + if (pLimitInfo->currentGroupId == 0 || + pLimitInfo->currentGroupId == pBlock->info.groupId) { // it is the first group pLimitInfo->currentGroupId = pBlock->info.groupId; continue; } else if (pLimitInfo->currentGroupId != pBlock->info.groupId) { @@ -4265,7 +4271,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { } // this the tags and pseudo function columns, we only keep the tag columns - for(int32_t i = 0; i < numOfTags; ++i) { + for (int32_t i = 0; i < numOfTags; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i); int32_t type = nodeType(pNode->pExpr); @@ -4381,7 +4387,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, int32_t groupNum = 0; for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); + int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4701,7 +4707,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { - pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo); + pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 2e6c9bd351..f26b2f4f0a 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -28,30 +28,30 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode); -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pOperator == NULL || pInfo == NULL) { goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); - int32_t numOfCols = 0; + int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); - pInfo->pRes = pResBlock; - pOperator->name = "MergeJoinOperator"; + pInfo->pRes = pResBlock; + pOperator->name = "MergeJoinOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; SNode* pMergeCondition = pJoinNode->pMergeCondition; if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { @@ -104,7 +104,7 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; nodesDestroyNode(pJoinOperator->pCondAfterMerge); - + taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a9d03aebbe..539ef18d87 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -274,7 +274,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); } else { - qDebug("%s data block filter out, elapsed time:%"PRId64, GET_TASKID(pTaskInfo), (et - st)); + qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st)); } return TSDB_CODE_SUCCESS; @@ -1838,11 +1838,14 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type; pColInfoData = taosArrayGet(p->pDataBlock, 4); char tagTypeStr[VARSTR_HEADER_SIZE + 32]; - int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name); + int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name); if (tagType == TSDB_DATA_TYPE_VARCHAR) { - tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); + tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", + (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); } else if (tagType == TSDB_DATA_TYPE_NCHAR) { - tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); + tagTypeLen += + sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", + (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); } varDataSetLen(tagTypeStr, tagTypeLen); colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false); @@ -2527,49 +2530,6 @@ _error: return NULL; } -typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; - int32_t tableStartIndex; - int32_t tableEndIndex; - bool hasGroupId; - uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SReadHandle readHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; - - SFileBlockLoadRecorder readRecorder; - int64_t numOfRows; - SScanInfo scanInfo; - int32_t scanTimes; - SNode* pFilterNode; // filter info, which is push down by optimizer - SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context - SResultRowInfo* pResultRowInfo; - int32_t* rowEntryInfoOffset; - SExprInfo* pExpr; - SSDataBlock* pResBlock; - SArray* pColMatchInfo; - int32_t numOfOutput; - - SExprInfo* pPseudoExpr; - int32_t numOfPseudoExpr; - SqlFunctionCtx* pPseudoCtx; - - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; - // if the upstream is an interval operator, the interval info is also kept here to get the time - // window to check if current data block needs to be loaded. - SInterval interval; - SSampleExecInfo sample; // sample execution info -} STableMergeScanInfo; - int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) { @@ -2975,6 +2935,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pTableScanInfo->pSortInfo); + taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset); taosMemoryFreeClear(param); } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index eec4780293..186a51f000 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1717,7 +1717,7 @@ static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { - const SJoinPhysiNode* pNode = (const SJoinPhysiNode*)pObj; + const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; int32_t code = physicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { @@ -1737,7 +1737,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { } static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { - SJoinPhysiNode* pNode = (SJoinPhysiNode*)pObj; + SSortMergeJoinPhysiNode* pNode = (SSortMergeJoinPhysiNode*)pObj; int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index b12e3b14c7..77681af1bc 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -468,7 +468,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; + SSortMergeJoinPhysiNode* pJoin = (SSortMergeJoinPhysiNode*)pNode; res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 23f0bb088d..3c6fbe409c 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -287,7 +287,7 @@ SNode* nodesMakeNode(ENodeType type) { case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return makeNode(type, sizeof(SProjectPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: - return makeNode(type, sizeof(SJoinPhysiNode)); + return makeNode(type, sizeof(SSortMergeJoinPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: return makeNode(type, sizeof(SAggPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: @@ -883,7 +883,7 @@ void nodesDestroyNode(SNode* pNode) { break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode; + SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 702422e022..d564d53633 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -739,12 +739,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* return TSDB_CODE_SUCCESS; } -static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, SArray* tagName, uint8_t tagNum) { +static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, + SArray* tagName, uint8_t tagNum) { pTbReq->type = TD_CHILD_TABLE; pTbReq->name = strdup(tname); pTbReq->ctb.suid = suid; pTbReq->ctb.tagNum = tagNum; - if(sname) pTbReq->ctb.name = strdup(sname); + if (sname) pTbReq->ctb.name = strdup(sname); pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.tagName = taosArrayDup(tagName); pTbReq->commentLen = -1; @@ -969,7 +970,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint } SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]]; - char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -1012,7 +1013,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint goto end; } - buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, pCxt->pTableMeta->tableInfo.numOfTags); + buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, + pCxt->pTableMeta->tableInfo.numOfTags); end: for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { @@ -1650,7 +1652,6 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { SName name; CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); - CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache)); CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache)); CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache)); CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); @@ -2332,7 +2333,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols return ret; } - buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags); + buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, + pTableMeta->tableInfo.numOfTags); taosArrayDestroy(tagName); smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 74d5f03dc1..7c9a8b10dd 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -92,7 +92,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { case TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG: return "sliding value no larger than the interval value"; case TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL: - return "sliding value can not less than 1% of interval value"; + return "sliding value can not less than 1%% of interval value"; case TSDB_CODE_PAR_ONLY_ONE_JSON_TAG: return "Only one tag if there is a json tag"; case TSDB_CODE_PAR_INCORRECT_NUM_OF_COL: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 3b545777a8..fcc395af62 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -993,22 +993,28 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) { } static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) { - int32_t code = TSDB_CODE_SUCCESS; - switch (nodeType(pNode)) { - case QUERY_NODE_LOGIC_PLAN_SCAN: + case QUERY_NODE_LOGIC_PLAN_SCAN: { + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + if (NULL != pScan->pGroupTags) { + *pNotOptimize = true; + return TSDB_CODE_SUCCESS; + } return nodesListMakeAppend(pScanNodes, (SNode*)pNode); - case QUERY_NODE_LOGIC_PLAN_JOIN: - code = + } + case QUERY_NODE_LOGIC_PLAN_JOIN: { + int32_t code = sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes); if (TSDB_CODE_SUCCESS == code) { code = sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes); } return code; + } case QUERY_NODE_LOGIC_PLAN_AGG: + case QUERY_NODE_LOGIC_PLAN_PARTITION: *pNotOptimize = true; - return code; + return TSDB_CODE_SUCCESS; default: break; } @@ -1034,6 +1040,18 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) { return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order; } +static void sortPriKeyOptSetParentOrder(SLogicNode* pNode, EOrder order) { + if (NULL == pNode) { + return; + } + if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode)) { + ((SWindowLogicNode*)pNode)->inputTsOrder = order; + } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) { + ((SJoinLogicNode*)pNode)->inputTsOrder = order; + } + sortPriKeyOptSetParentOrder(pNode->pParent, order); +} + static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort, SNodeList* pScanNodes) { EOrder order = sortPriKeyOptGetPriKeyOrder(pSort); @@ -1048,6 +1066,7 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; } + sortPriKeyOptSetParentOrder(pScan->node.pParent, order); } SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0); @@ -1613,10 +1632,10 @@ static void alignProjectionWithTarget(SLogicNode* pNode) { } SProjectLogicNode* pProjectNode = (SProjectLogicNode*)pNode; - SNode* pProjection = NULL; + SNode* pProjection = NULL; FOREACH(pProjection, pProjectNode->pProjections) { SNode* pTarget = NULL; - bool keep = false; + bool keep = false; FOREACH(pTarget, pNode->pTargets) { if (0 == strcmp(((SColumnNode*)pProjection)->node.aliasName, ((SColumnNode*)pTarget)->colName)) { keep = true; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index fd359fce45..38f3c23925 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -621,8 +621,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { - SJoinPhysiNode* pJoin = - (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); + SSortMergeJoinPhysiNode* pJoin = + (SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); if (NULL == pJoin) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 879c04743d..81e2bff179 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -469,7 +469,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent return code; } -static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** pMergeKeys) { +static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) { SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); if (NULL == pMergeKey) { return TSDB_CODE_OUT_OF_MEMORY; @@ -479,7 +479,7 @@ static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** nodesDestroyNode((SNode*)pMergeKey); return TSDB_CODE_OUT_OF_MEMORY; } - pMergeKey->order = ORDER_ASC; + pMergeKey->order = order; pMergeKey->nullOrder = NULL_ORDER_FIRST; return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey); } @@ -491,7 +491,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH; ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE; SNodeList* pMergeKeys = NULL; - code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); + code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, + ((SWindowLogicNode*)pInfo->pSplitNode)->inputTsOrder, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true); } @@ -579,7 +580,8 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0); SNodeList* pMergeKeys = NULL; - int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys); + int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, + ((SWindowLogicNode*)pWindow)->inputTsOrder, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true); @@ -950,7 +952,8 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE; pMergeScan->node.pChildren = pChildren; splSetParent((SLogicNode*)pMergeScan); - code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan), &pMergeKeys); + code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan), + pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys); } if (TSDB_CODE_SUCCESS == code) { @@ -1020,14 +1023,14 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) } static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) { - SNode* pPrimaryKey = - nodesCloneNode(stbSplFindPrimaryKeyFromScan((SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0))); + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0); + SNode* pPrimaryKey = nodesCloneNode(stbSplFindPrimaryKeyFromScan(pScan)); if (NULL == pPrimaryKey) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pMergeKeys); + code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys); } return code; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 80ad480e43..ad6eff3c12 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -512,7 +512,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_UNIT, "Cannot use 'year' as TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG, "Interval offset should be shorter than interval") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_UNIT, "Does not support sliding when interval is natural month/year") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG, "sliding value no larger than the interval value") -TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1% of interval value") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1%% of interval value") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_ONLY_ONE_JSON_TAG, "Only one tag if there is a json tag") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_NUM_OF_COL, "Query block has incorrect number of result columns") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL, "Incorrect TIMESTAMP value") From 9e9a12cb094a15d702054b6144e44e10d0d042aa Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 25 Jul 2022 15:23:43 +0800 Subject: [PATCH 4/9] fix: fix memory leak of pseduo col computation in table merge scan --- source/libs/executor/src/scanoperator.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a9d03aebbe..8df22bbb11 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2557,9 +2557,7 @@ typedef struct STableMergeScanInfo { SArray* pColMatchInfo; int32_t numOfOutput; - SExprInfo* pPseudoExpr; - int32_t numOfPseudoExpr; - SqlFunctionCtx* pPseudoCtx; + SExprSupp pseudoSup; SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan @@ -2974,6 +2972,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); taosArrayDestroy(pTableScanInfo->pSortInfo); + cleanupExprSupp(&pTableScanInfo->pseudoSup); taosMemoryFreeClear(param); } @@ -3031,8 +3030,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN } if (pTableScanNode->scan.pScanPseudoCols != NULL) { - pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); - pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset); + SExprSupp* pSup = &pInfo->pseudoSup; + pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; From 66bad33c9e8e3f986c43f519202ef9bc42e35baa Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 25 Jul 2022 15:34:11 +0800 Subject: [PATCH 5/9] fix: fix compilation error --- source/libs/executor/src/scanoperator.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8df22bbb11..9d6c800ceb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2698,9 +2698,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); // currently only the tbname pseudo column - if (pTableScanInfo->numOfPseudoExpr > 0) { - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, - pTableScanInfo->numOfPseudoExpr, pBlock, GET_TASKID(pTaskInfo)); + if (pTableScanInfo->pseudoSup.numOfExprs > 0) { + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, + pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } From 222e925644eeb77a977301e7b342f4bbdf6d17a4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 25 Jul 2022 15:31:36 +0800 Subject: [PATCH 6/9] feat(wal): ref --- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/dnode/vnode/src/tq/tqMeta.c | 11 +++++++++-- source/libs/wal/src/walRead.c | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 740cfba9e3..118e3a5d43 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -385,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { - int64_t fetchVer = fetchOffsetNew.version + 1; - SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + int64_t fetchVer = fetchOffsetNew.version + 1; + pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { code = -1; goto OVER; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 620417016f..290ffe5c8d 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) { ASSERT(0); } - TXN txn; + TXN txn = {0}; if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { ASSERT(0); @@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) { STqHandle handle; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeSTqHandle(&decoder, &handle); - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + + handle.pRef = walOpenRef(pTq->pVnode->pWal); + if (handle.pRef == NULL) { + ASSERT(0); + } + walRefVer(handle.pRef, handle.snapshotVer); + if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SReadHandle reader = { .meta = pTq->pVnode->pMeta, @@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) { handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(handle.execHandle.pExecReader); } else { + handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 37d7a8c3a9..ac62b7d98d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -54,7 +54,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { } /*if (pReader->cond.enableRef) {*/ - /*taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/ + /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/ /*}*/ return pReader; From 0494f4a6e9716b31a935b7a8e127a8370deab369 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 25 Jul 2022 16:41:32 +0800 Subject: [PATCH 7/9] fix: fix table merge scan memory leak --- source/libs/executor/src/scanoperator.c | 34 ++++++++++++++----------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 89db5761c2..2dcb555834 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2829,29 +2829,31 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - tsortDestroySortHandle(pInfo->pSortHandle); + size_t numReaders = taosArrayGetSize(pInfo->dataReaders); + + for (int32_t i = 0; i < numReaders; ++i) { + STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); + blockDataDestroy(param->inputBlock); + } taosArrayClear(pInfo->sortSourceParams); - for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) { + tsortDestroySortHandle(pInfo->pSortHandle); + + for (int32_t i = 0; i < numReaders; ++i) { STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i); tsdbReaderClose(reader); } - taosArrayDestroy(pInfo->dataReaders); pInfo->dataReaders = NULL; return TSDB_CODE_SUCCESS; } -SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) { +SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSDataBlock* p = tsortGetSortedDataBlock(pHandle); - if (p == NULL) { - return NULL; - } - - blockDataEnsureCapacity(p, capacity); + blockDataCleanup(pResBlock); + blockDataEnsureCapacity(pResBlock, capacity); while (1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); @@ -2859,14 +2861,15 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa break; } - appendOneRowToDataBlock(p, pTupleHandle); - if (p->info.rows >= capacity) { + appendOneRowToDataBlock(pResBlock, pTupleHandle); + if (pResBlock->info.rows >= capacity) { break; } } - qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); - return (p->info.rows > 0) ? p : NULL; + + qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows); + return (pResBlock->info.rows > 0) ? pResBlock : NULL; } SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { @@ -2895,7 +2898,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = NULL; while (pInfo->tableStartIndex < tableListSize) { - pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); + pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); if (pBlock != NULL) { pBlock->info.groupId = pInfo->groupId; pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -2919,6 +2922,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->cond); + taosArrayDestroy(pTableScanInfo->sortSourceParams); for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); From 8bb432c348e8e5bf8cc03fd6ddfc49978110ca2f Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 25 Jul 2022 16:45:23 +0800 Subject: [PATCH 8/9] feat: super table order by primary key optimization --- include/libs/nodes/querynodes.h | 1 + source/libs/nodes/src/nodesCloneFuncs.c | 2 ++ source/libs/parser/src/parAstCreater.c | 1 + source/libs/planner/src/planLogicCreater.c | 8 +++++--- source/libs/planner/src/planPhysiCreater.c | 3 +++ 5 files changed, 12 insertions(+), 3 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index f8c7024591..81ed5b5ecd 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -255,6 +255,7 @@ typedef struct SSelectStmt { int32_t selectFuncNum; bool isEmptyResult; bool isTimeLineResult; + bool isSubquery; bool hasAggFuncs; bool hasRepeatScanFuncs; bool hasIndefiniteRowsFunc; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 121e697630..5279d015b4 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -375,6 +375,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pOnConditions); COPY_SCALAR_FIELD(isSingleTableJoin); + COPY_SCALAR_FIELD(inputTsOrder); return TSDB_CODE_SUCCESS; } @@ -440,6 +441,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(windowAlgo); + COPY_SCALAR_FIELD(inputTsOrder); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index f52ee9af50..a54dae1ee9 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -527,6 +527,7 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok } if (QUERY_NODE_SELECT_STMT == nodeType(pSubquery)) { strcpy(((SSelectStmt*)pSubquery)->stmtName, tempTable->table.tableAlias); + ((SSelectStmt*)pSubquery)->isSubquery = true; } else if (QUERY_NODE_SET_OPERATOR == nodeType(pSubquery)) { strcpy(((SSetOperator*)pSubquery)->stmtName, tempTable->table.tableAlias); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 84e712b466..30e3b676df 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -339,6 +339,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->joinType = pJoinTable->joinType; pJoin->isSingleTableJoin = pJoinTable->table.singleTable; + pJoin->inputTsOrder = ORDER_ASC; pJoin->node.groupAction = GROUP_ACTION_CLEAR; pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; @@ -625,14 +626,14 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) { - int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs); - if (pCxt->pPlanCxt->streamQuery) { pWindow->triggerType = pCxt->pPlanCxt->triggerType; pWindow->watermark = pCxt->pPlanCxt->watermark; pWindow->igExpired = pCxt->pPlanCxt->igExpired; } + pWindow->inputTsOrder = ORDER_ASC; + int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs); if (TSDB_CODE_SUCCESS == code) { code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); } @@ -861,7 +862,8 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit); - pProject->node.groupAction = GROUP_ACTION_CLEAR; + pProject->node.groupAction = + (!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 38f3c23925..587e566939 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -974,6 +974,9 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh } static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) { + if (GROUP_ACTION_KEEP == pProject->node.groupAction) { + return false; + } if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) { return true; } From 3614db50c61b878aa983768bccde409fc65ccc83 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Mon, 25 Jul 2022 17:50:36 +0800 Subject: [PATCH 9/9] test: add test case for fix --- tests/system-test/7-tmq/TD-17803.py | 198 ++++++++++++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 tests/system-test/7-tmq/TD-17803.py diff --git a/tests/system-test/7-tmq/TD-17803.py b/tests/system-test/7-tmq/TD-17803.py new file mode 100644 index 0000000000..771ff83a29 --- /dev/null +++ b/tests/system-test/7-tmq/TD-17803.py @@ -0,0 +1,198 @@ +from distutils.log import error +import taos +import sys +import time +import socket +import os +import threading +import subprocess +import platform + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + + + +class TDTestCase: + def __init__(self): + self.snapshot = 0 + self.replica = 3 + self.vgroups = 3 + self.ctbNum = 2 + self.rowsPerTbl = 2 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def checkFileContent(self, consumerId, queryString): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId) + cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile) + tdLog.info(cmdStr) + os.system(cmdStr) + + consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId) + tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile)) + + consumeFile = open(consumeRowsFile, mode='r') + queryFile = open(dstFile, mode='r') + + # skip first line for it is schema + queryFile.readline() + + while True: + dst = queryFile.readline() + src = consumeFile.readline() + + if dst: + if dst != src: + tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId) + else: + break + return + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 2, + 'rowsPerTbl': 1000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=self.replica) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.asyncInsertDataByInterlace(paraDict) + tdLog.printNoPrefix("11111111111111111111111") + tmqCom.create_ntable(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_elm_list=paraDict["colSchema"], colPrefix='c', tblNum=1) + tdLog.printNoPrefix("222222222222222") + tmqCom.insert_rows_into_ntbl(tdSql, dbname=paraDict["dbName"], tbname_prefix="ntb", tbname_index_start_num = 1, column_ele_list=paraDict["colSchema"], startTs=paraDict["startTs"], tblNum=1, rows=2) # tdLog.info("restart taosd to ensure that the data falls into the disk") + + tdLog.printNoPrefix("333333333333333333333") + tdSql.query("drop database %s"%paraDict["dbName"]) + tdLog.printNoPrefix("44444444444444444") + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + + # create and start thread + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 1000, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 1} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + pollDelay = 100 + showMsg = 1 + showRow = 1 + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) + + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + # tmqCom.checkFileContent(consumerId, queryString) + + tmqCom.waitSubscriptionExit(tdSql, topicFromStb1) + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + # self.tmqCase1() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())