diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index c1a51898bc..7f1927beec 100755 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -283,6 +283,7 @@ SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) { continue; } break; + pNonSortMerge->lastSourceIdx = idx - 1; } if (!pBlock) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index bcb51aef59..6e7a6705bc 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -818,7 +818,7 @@ static bool pdcJoinIsPrimEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { tSimpleHashCleanup(pLeftTables); tSimpleHashCleanup(pRightTables); - + return res; } @@ -1186,14 +1186,14 @@ static int32_t pdcJoinAddPreFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogi if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pCondCols, &pTargets); } - + nodesDestroyList(pCondCols); - + if (TSDB_CODE_SUCCESS == code) { SNode* pNode = NULL; FOREACH(pNode, pTargets) { SNode* pTmp = NULL; - bool found = false; + bool found = false; FOREACH(pTmp, pJoin->node.pTargets) { if (nodesEqualNode(pTmp, pNode)) { found = true; @@ -1204,7 +1204,7 @@ static int32_t pdcJoinAddPreFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogi nodesListStrictAppend(pJoin->node.pTargets, nodesCloneNode(pNode)); } } - } + } nodesDestroyList(pTargets); @@ -2917,8 +2917,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub scanPathOptSetGroupOrderScan(pScan); pParent->hasGroupKeyOptimized = true; } - if (pNode->pParent->pSlimit) - pScan->groupOrderScan = true; + if (pNode->pParent->pSlimit) pScan->groupOrderScan = true; NODES_CLEAR_LIST(pNode->pChildren); nodesDestroyNode((SNode*)pNode); @@ -3395,7 +3394,7 @@ static SNode* rewriteUniqueOptCreateFirstFunc(SFunctionNode* pSelectValue, SNode strcpy(pFunc->node.aliasName, pSelectValue->node.aliasName); } else { int64_t pointer = (int64_t)pFunc; - char name[TSDB_FUNC_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0}; + char name[TSDB_FUNC_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0}; int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64 "", pFunc->functionName, pointer); taosCreateMD5Hash(name, len); strncpy(pFunc->node.aliasName, name, TSDB_COL_NAME_LEN - 1); @@ -3603,8 +3602,8 @@ static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast /// @param lastColId only used when lastColNum equals 1, the col id of the only one last col /// @param selectNonPKColNum num of normal cols /// @param selectNonPKColId only used when selectNonPKColNum equals 1, the col id of the only one select col -static bool lastRowScanOptCheckColNum(int32_t lastColNum, col_id_t lastColId, - int32_t selectNonPKColNum, col_id_t selectNonPKColId) { +static bool lastRowScanOptCheckColNum(int32_t lastColNum, col_id_t lastColId, int32_t selectNonPKColNum, + col_id_t selectNonPKColId) { // multi select non pk col + last func: select c1, c2, last(c1) if (selectNonPKColNum > 1 && lastColNum > 0) return false; @@ -3678,8 +3677,7 @@ static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, int8_t cacheLastModel } else if (lastColNum > 0) { return false; } - if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId)) - return false; + if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId)) return false; } else if (FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) { if (!lastRowScanOptLastParaIsTag(nodesListGetNode(pAggFunc->pParameterList, 0))) { return false; @@ -3720,7 +3718,7 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { if (!lastRowScanOptCheckLastCache(pAgg, pScan)) { return false; } - + bool hasOtherFunc = false; if (!lastRowScanOptCheckFuncList(pNode, pScan->cacheLastMode, &hasOtherFunc)) { return false; @@ -3947,7 +3945,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic } else { pNode = nodesListGetNode(pFunc->pParameterList, 0); nodesListMakeAppend(&cxt.pOtherCols, pNode); - + if (FUNCTION_TYPE_SELECT_VALUE == funcType) { if (nodeType(pNode) == QUERY_NODE_COLUMN) { SColumnNode* pCol = (SColumnNode*)pNode; @@ -3973,7 +3971,8 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol); nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pPKTsCol)); } - if (pNonPKCol && cxt.pLastCols->length == 1 && nodesEqualNode((SNode*)pNonPKCol, nodesListGetNode(cxt.pLastCols, 0))) { + if (pNonPKCol && cxt.pLastCols->length == 1 && + nodesEqualNode((SNode*)pNonPKCol, nodesListGetNode(cxt.pLastCols, 0))) { // when select last(c1), c1 from ..., we add c1 to targets sprintf(pNonPKCol->colName, "#sel_val.%p", pNonPKCol); nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pNonPKCol)); @@ -3990,7 +3989,6 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic return TSDB_CODE_SUCCESS; } - static bool splitCacheLastFuncOptMayBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) { @@ -4015,7 +4013,8 @@ static bool splitCacheLastFuncOptMayBeOptimized(SLogicNode* pNode) { return true; } -static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, SAggLogicNode* pAgg, SNodeList* pFunc, SNodeList* pTargets) { +static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, SAggLogicNode* pAgg, SNodeList* pFunc, + SNodeList* pTargets) { SAggLogicNode* pNew = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG); if (NULL == pNew) { nodesDestroyList(pFunc); @@ -4051,10 +4050,10 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, pNew->node.pChildren = nodesCloneList(pAgg->node.pChildren); int32_t code = 0; - SNode* pNode = nodesListGetNode(pNew->node.pChildren, 0); + SNode* pNode = nodesListGetNode(pNew->node.pChildren, 0); if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; - SNodeList* pOldScanCols = NULL; + SNodeList* pOldScanCols = NULL; TSWAP(pScan->pScanCols, pOldScanCols); nodesDestroyList(pScan->pScanPseudoCols); pScan->pScanPseudoCols = NULL; @@ -4110,7 +4109,8 @@ static int32_t splitCacheLastFuncOptModifyAggLogicNode(SAggLogicNode* pAgg) { return TSDB_CODE_SUCCESS; } -static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew, SAggLogicNode* pAgg1, SAggLogicNode* pAgg2) { +static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew, SAggLogicNode* pAgg1, + SAggLogicNode* pAgg2) { SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; @@ -4127,17 +4127,13 @@ static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew, nodesDestroyNode(pNewAgg2); return TSDB_CODE_OUT_OF_MEMORY; } - + ((SAggLogicNode*)pNewAgg1)->node.pParent = (SLogicNode*)pMerge; ((SAggLogicNode*)pNewAgg2)->node.pParent = (SLogicNode*)pMerge; SNode* pNode = NULL; - FOREACH(pNode, ((SAggLogicNode*)pNewAgg1)->node.pChildren) { - ((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg1; - } - FOREACH(pNode, ((SAggLogicNode*)pNewAgg2)->node.pChildren) { - ((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg2; - } + FOREACH(pNode, ((SAggLogicNode*)pNewAgg1)->node.pChildren) { ((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg1; } + FOREACH(pNode, ((SAggLogicNode*)pNewAgg2)->node.pChildren) { ((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg2; } int32_t code = nodesListMakeStrictAppendList(&pMerge->node.pTargets, nodesCloneList(pAgg1->node.pTargets)); if (TSDB_CODE_SUCCESS == code) { @@ -4157,7 +4153,7 @@ static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew, } else { *pNew = pMerge; } - + return code; } @@ -4203,8 +4199,8 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan* { WHERE_EACH(pNode, pAgg->node.pTargets) { SColumnNode* pCol = (SColumnNode*)pNode; - SNode* pFuncNode = NULL; - bool found = false; + SNode* pFuncNode = NULL; + bool found = false; FOREACH(pFuncNode, pAggFuncList) { SFunctionNode* pFunc = (SFunctionNode*)pFuncNode; if (0 == strcmp(pFunc->node.aliasName, pCol->colName)) { @@ -4217,7 +4213,7 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan* ERASE_NODE(pAgg->node.pTargets); continue; } - WHERE_NEXT; + WHERE_NEXT; } } @@ -4228,8 +4224,8 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan* } SMergeLogicNode* pMerge = NULL; - SAggLogicNode* pNewAgg = NULL; - int32_t code = splitCacheLastFuncOptCreateAggLogicNode(&pNewAgg, pAgg, pAggFuncList, pTargets); + SAggLogicNode* pNewAgg = NULL; + int32_t code = splitCacheLastFuncOptCreateAggLogicNode(&pNewAgg, pAgg, pAggFuncList, pTargets); if (TSDB_CODE_SUCCESS == code) { code = splitCacheLastFuncOptModifyAggLogicNode(pAgg); } @@ -4240,19 +4236,17 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan* code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pAgg, (SLogicNode*)pMerge); } - nodesDestroyNode((SNode *)pAgg); - nodesDestroyNode((SNode *)pNewAgg); + nodesDestroyNode((SNode*)pAgg); + nodesDestroyNode((SNode*)pNewAgg); if (TSDB_CODE_SUCCESS != code) { - nodesDestroyNode((SNode *)pMerge); + nodesDestroyNode((SNode*)pMerge); } - + pCxt->optimized = true; return code; } - - // merge projects static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) { @@ -4405,7 +4399,7 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp } pScanNode->onlyMetaCtbIdx = false; - + pCxt->optimized = true; return TSDB_CODE_SUCCESS; } @@ -4449,8 +4443,8 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu // The scan below will do scanning with group order return cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_SLIMIT); } - // else if not part by tag and tbname, the partition node below indicates that results are sorted, the agg node can - // be pipelined. + // else if not part by tag and tbname, the partition node below indicates that results are sorted, the agg node + // can be pipelined. if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && LIST_LENGTH(pNodeLimitPushTo->pChildren) == 1) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNodeLimitPushTo->pChildren, 0); if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_PARTITION) { @@ -5121,19 +5115,16 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) { return true; } - int32_t stbJoinOptAddFuncToScanNode(char* funcName, SScanLogicNode* pScan) { SFunctionNode* pUidFunc = createFunction(funcName, NULL); - snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p", - pUidFunc->functionName, pUidFunc); - int32_t code = nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pUidFunc); + snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p", pUidFunc->functionName, pUidFunc); + int32_t code = nodesListStrictAppend(pScan->pScanPseudoCols, (SNode*)pUidFunc); if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExpr((SNode*)pUidFunc, &pScan->node.pTargets); } return code; } - int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; SJoinLogicNode* pJoinNode = (SJoinLogicNode*)pJoin; @@ -5146,14 +5137,14 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) { pScan->onlyMetaCtbIdx = true; SNodeList* pTags = nodesMakeList(); - int32_t code = nodesCollectColumnsFromNode(pJoinNode->pTagEqCond, NULL, COLLECT_COL_TYPE_TAG, &pTags); + int32_t code = nodesCollectColumnsFromNode(pJoinNode->pTagEqCond, NULL, COLLECT_COL_TYPE_TAG, &pTags); if (TSDB_CODE_SUCCESS == code) { code = nodesCollectColumnsFromNode(pJoinNode->pTagOnCond, NULL, COLLECT_COL_TYPE_TAG, &pTags); } if (TSDB_CODE_SUCCESS == code) { SNode* pTarget = NULL; SNode* pTag = NULL; - bool found = false; + bool found = false; WHERE_EACH(pTarget, pScan->node.pTargets) { found = false; SColumnNode* pTargetCol = (SColumnNode*)pTarget; @@ -5170,7 +5161,7 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) { WHERE_NEXT; } } - } + } if (TSDB_CODE_SUCCESS == code) { code = stbJoinOptAddFuncToScanNode("_tbuid", pScan); } @@ -5181,7 +5172,7 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) { if (code) { nodesDestroyList(pTags); } - + return code; } @@ -5192,7 +5183,7 @@ static int32_t stbJoinOptCreateTagScanNode(SLogicNode* pJoin, SNodeList** ppList } int32_t code = TSDB_CODE_SUCCESS; - SNode* pNode = NULL; + SNode* pNode = NULL; FOREACH(pNode, pList) { code = stbJoinOptRewriteToTagScan(pJoin, pNode); if (code) { @@ -5234,9 +5225,10 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh SNode* pNode = NULL; FOREACH(pNode, pChildren) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; - SNode* pCol = NULL; + SNode* pCol = NULL; FOREACH(pCol, pScan->pScanPseudoCols) { - if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) { + if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || + ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) { code = createColumnByRewriteExpr(pCol, &pJoin->node.pTargets); if (code) { break; @@ -5279,7 +5271,7 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL int32_t code = TSDB_CODE_SUCCESS; int32_t i = 0; - SNode* pNode = NULL; + SNode* pNode = NULL; FOREACH(pNode, pList) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; //code = stbJoinOptAddFuncToScanNode("_tbuid", pScan); @@ -5294,7 +5286,7 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL pScan->node.dynamicOp = true; *(srcScan + i++) = pScan->pVgroupList->numOfVgroups <= 1; - + pScan->scanType = SCAN_TYPE_TABLE; } @@ -5304,13 +5296,13 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL } static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChildren, SLogicNode** ppLogic) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SGroupCacheLogicNode* pGrpCache = (SGroupCacheLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_GROUP_CACHE); if (NULL == pGrpCache) { return TSDB_CODE_OUT_OF_MEMORY; } - - //pGrpCache->node.dynamicOp = true; + + // pGrpCache->node.dynamicOp = true; pGrpCache->grpColsMayBeNull = false; pGrpCache->grpByUid = true; pGrpCache->batchFetch = getBatchScanOptionFromHint(pRoot->pHint); @@ -5325,9 +5317,10 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChi } SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pChildren, 0); - SNode* pCol = NULL; + SNode* pCol = NULL; FOREACH(pCol, pScan->pScanPseudoCols) { - if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) { + if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || + ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) { code = createColumnByRewriteExpr(pCol, &pGrpCache->pGroupCols); if (code) { break; @@ -5335,7 +5328,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChi } } - bool hasCond = false; + bool hasCond = false; SNode* pNode = NULL; FOREACH(pNode, pChildren) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; @@ -5345,7 +5338,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChi pScan->node.pParent = (SLogicNode*)pGrpCache; } pGrpCache->globalGrp = false; - + if (TSDB_CODE_SUCCESS == code) { *ppLogic = (SLogicNode*)pGrpCache; } else { @@ -5397,16 +5390,14 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi } pJoin->joinAlgo = JOIN_ALGO_MERGE; - //pJoin->node.dynamicOp = true; + // pJoin->node.dynamicOp = true; stbJoinOptRemoveTagEqCond(pJoin); NODES_DESTORY_NODE(pJoin->pTagEqCond); - + SNode* pNode = NULL; - FOREACH(pNode, pJoin->node.pChildren) { - ERASE_NODE(pJoin->node.pChildren); - } - int32_t code = nodesListStrictAppend(pJoin->node.pChildren, (SNode *)pChild); + FOREACH(pNode, pJoin->node.pChildren) { ERASE_NODE(pJoin->node.pChildren); } + int32_t code = nodesListStrictAppend(pJoin->node.pChildren, (SNode*)pChild); if (TSDB_CODE_SUCCESS == code) { pChild->pParent = (SLogicNode*)pJoin; *ppLogic = (SLogicNode*)pJoin; @@ -5418,8 +5409,9 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi return code; } -static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pRoot, SLogicNode* pPrev, SLogicNode* pPost, bool* srcScan, SLogicNode** ppDynNode) { - int32_t code = TSDB_CODE_SUCCESS; +static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pRoot, SLogicNode* pPrev, SLogicNode* pPost, bool* srcScan, + SLogicNode** ppDynNode) { + int32_t code = TSDB_CODE_SUCCESS; SDynQueryCtrlLogicNode* pDynCtrl = (SDynQueryCtrlLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL); if (NULL == pDynCtrl) { return TSDB_CODE_OUT_OF_MEMORY; @@ -5428,22 +5420,22 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pRoot, SLogicNode* p pDynCtrl->qType = DYN_QTYPE_STB_HASH; pDynCtrl->stbJoin.batchFetch = getBatchScanOptionFromHint(pRoot->pHint); memcpy(pDynCtrl->stbJoin.srcScan, srcScan, sizeof(pDynCtrl->stbJoin.srcScan)); - - if (TSDB_CODE_SUCCESS == code) { + + if (TSDB_CODE_SUCCESS == code) { pDynCtrl->node.pChildren = nodesMakeList(); if (NULL == pDynCtrl->node.pChildren) { code = TSDB_CODE_OUT_OF_MEMORY; } } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code) { pDynCtrl->stbJoin.pVgList = nodesMakeList(); if (NULL == pDynCtrl->stbJoin.pVgList) { code = TSDB_CODE_OUT_OF_MEMORY; } } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code) { pDynCtrl->stbJoin.pUidList = nodesMakeList(); if (NULL == pDynCtrl->stbJoin.pUidList) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -5472,7 +5464,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pRoot, SLogicNode* p if (TSDB_CODE_SUCCESS == code) { pPrev->pParent = (SLogicNode*)pDynCtrl; pPost->pParent = (SLogicNode*)pDynCtrl; - + *ppDynNode = (SLogicNode*)pDynCtrl; } else { nodesDestroyNode((SNode*)pDynCtrl); @@ -5488,9 +5480,9 @@ static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* p SLogicNode* pGrpCacheNode = NULL; SLogicNode* pHJoinNode = NULL; SLogicNode* pMJoinNode = NULL; - SLogicNode* pDynNode = NULL; + SLogicNode* pDynNode = NULL; bool srcScan[2] = {0}; - + int32_t code = stbJoinOptCreateTagScanNode(pJoin, &pTagScanNodes); if (TSDB_CODE_SUCCESS == code) { code = stbJoinOptCreateTagHashJoinNode(pJoin, pTagScanNodes, &pHJoinNode); @@ -5642,8 +5634,8 @@ static bool partColOptShouldBeOptimized(SLogicNode* pNode) { } static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { - SNode* node; - int32_t code = TSDB_CODE_SUCCESS; + SNode* node; + int32_t code = TSDB_CODE_SUCCESS; SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT); if (pSort) { bool alreadyPartByPKTs = false; @@ -5703,7 +5695,6 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub if (NULL == pNode) return TSDB_CODE_SUCCESS; SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pNode); - if (pRootNode->pHint && getSortForGroupOptHint(pRootNode->pHint)) { // replace with sort node SSortLogicNode* pSort = partColOptCreateSort(pNode); @@ -5802,6 +5793,7 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) { FOREACH(pTmpNode, pFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pTmpNode; // TODO test other pseudo column funcs + // TODO test funcs with multi params if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) && !fmIsGroupKeyFunc(pFunc->funcId)) { return false; @@ -5814,20 +5806,20 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) { } typedef struct STSMAOptUsefulTsma { - const STableTSMAInfo* pTsma; // NULL if no tsma available, which will use original data for calculation - STimeWindow scanRange; // scan time range for this tsma - SArray* pTsmaScanCols; // SArray index of tsmaFuncs array + const STableTSMAInfo* pTsma; // NULL if no tsma available, which will use original data for calculation + STimeWindow scanRange; // scan time range for this tsma + SArray* pTsmaScanCols; // SArray index of tsmaFuncs array } STSMAOptUsefulTsma; typedef struct STSMAOptCtx { // input - SScanLogicNode* pScan; - SLogicNode* pParent; // parent of Table Scan, Agg or Interval - const SNodeList* pAggFuncs; - const STimeWindow* pTimeRange; - const SArray* pTsmas; - SInterval* queryInterval; // not null with window logic node - int8_t precision; + SScanLogicNode* pScan; + SLogicNode* pParent; // parent of Table Scan, Agg or Interval + const SNodeList* pAggFuncs; + const STimeWindow* pTimeRange; + const SArray* pTsmas; + SInterval* queryInterval; // not null with window logic node + int8_t precision; // output SArray* pUsefulTsmas; // SArray, sorted by tsma interval from long to short @@ -5835,7 +5827,7 @@ typedef struct STSMAOptCtx { SLogicSubplan* generatedSubPlans[2]; } STSMAOptCtx; -static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) { +static int32_t fillTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) { int32_t code = 0; pTsmaOptCtx->pScan = pScan; pTsmaOptCtx->pParent = pScan->node.pParent; @@ -5939,7 +5931,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ if (queryColId > pTsmaFuncInfo->colId) { continue; } - found= true; + found = true; taosArrayPush(pTsmaScanCols, &j); break; } @@ -5983,9 +5975,9 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) { } static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) { - const STSMAOptUsefulTsma* p = pLeft, *q = pRight; - int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval; - int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI); + const STSMAOptUsefulTsma *p = pLeft, *q = pRight; + int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval; + int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI); ASSERT(code == TSDB_CODE_SUCCESS); code = getDuration(qInterval, q->pTsma->unit, &qInterval, TSDB_TIME_PRECISION_MILLI); ASSERT(code == TSDB_CODE_SUCCESS); @@ -5994,7 +5986,8 @@ static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRigh return 0; } -static const STSMAOptUsefulTsma* tsmaOptFindUsefulTsma(const SArray* pUsefulTsmas, int32_t startIdx, int64_t alignInterval, int8_t precision) { +static const STSMAOptUsefulTsma* tsmaOptFindUsefulTsma(const SArray* pUsefulTsmas, int32_t startIdx, + int64_t alignInterval, int8_t precision) { int64_t tsmaInterval; for (int32_t i = startIdx; i < pUsefulTsmas->size; ++i) { const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pUsefulTsmas, i); @@ -6074,7 +6067,8 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc // the main tsma if (endOfSkeyFirstWin <= startOfEkeyFirstWin) { - scanRange.ekey = TMIN(pScanRange->ekey, startOfEkeyFirstWin - 1); + scanRange.ekey = + TMIN(pScanRange->ekey, endOfSkeyFirstWin == startOfEkeyFirstWin ? pScanRange->ekey : startOfEkeyFirstWin - 1); if (!isSkeyAlignedWithTsma) { scanRange.skey = endOfSkeyFirstWin; } @@ -6139,7 +6133,8 @@ SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNod return pScanCols; } -static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma, SColumnNode* pTagCol) { +static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma, + SColumnNode* pTagCol) { bool found = false; if (pTagCol->colType != COLUMN_TYPE_TAG) return 0; for (int32_t i = 0; i < pTsma->pTsma->pTags->size; ++i) { @@ -6160,7 +6155,7 @@ static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptU static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbNameNode, const STSMAOptUsefulTsma* pTsma) { - int32_t code = 0; + int32_t code = 0; SFunctionNode* pRewrittenFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); if (!pRewrittenFunc || !pValue) code = TSDB_CODE_OUT_OF_MEMORY; @@ -6186,7 +6181,7 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN if (code == TSDB_CODE_SUCCESS) { code = nodesListAppend(pRewrittenFunc->pParameterList, (SNode*)pValue); } - } else if (code == TSDB_CODE_SUCCESS){ + } else if (code == TSDB_CODE_SUCCESS) { // if no tsma, we replace func tbname with concat('', tbname) pRewrittenFunc->funcId = fmGetFuncId("concat"); snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "concat('', tbname)"); @@ -6284,7 +6279,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew tstrncpy(pPkTsCol->tableAlias, pTsma->pTsma->targetTb, TSDB_TABLE_NAME_LEN); pPkTsCol->tableId = pTsma->pTsma->destTbUid; pPkTsCol->tableType = TSDB_SUPER_TABLE; - nodesListMakeAppend(&pNewScan->pScanCols, (SNode*)pPkTsCol); + nodesListMakeStrictAppend(&pNewScan->pScanCols, nodesCloneNode((SNode*)pPkTsCol)); } if (code == TSDB_CODE_SUCCESS) { pNewScan->stableId = pTsma->pTsma->destTbUid; @@ -6336,7 +6331,7 @@ static int32_t tsmaOptCreateWStart(int8_t precision, SFunctionNode** pWStartOut) } strcpy(pWStart->functionName, "_wstart"); int64_t pointer = (int64_t)pWStart; - char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0}; + char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0}; int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64 "", pWStart->functionName, pointer); taosCreateMD5Hash(name, len); strncpy(pWStart->node.aliasName, name, TSDB_COL_NAME_LEN - 1); @@ -6351,6 +6346,73 @@ static int32_t tsmaOptCreateWStart(int8_t precision, SFunctionNode** pWStartOut) return code; } +static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan, + const STSMAOptUsefulTsma* pTsma) { + int32_t code = 0; + SColumnNode* pColNode; + SWindowLogicNode* pWindow; + SAggLogicNode* pAgg; + SNodeList* pAggFuncs; + SListCell* pScanListCell; + SNode* pAggFuncNode; + SNodeList* pAggStateFuncs = NULL; + bool isFirstMergeNode = pTsmaOptCtx->pScan == pScan; + SFunctionNode * pPartial = NULL, *pMerge = NULL; + + if (nodeType(pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) { + pWindow = (SWindowLogicNode*)pParent; + pAggFuncs = pWindow->pFuncs; + } else { + pAgg = (SAggLogicNode*)pParent; + pAggFuncs = pAgg->pAggFuncs; + } + pScanListCell = pScan->pScanCols->pHead; + + FOREACH(pAggFuncNode, pAggFuncs) { + SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode; + if (fmIsGroupKeyFunc(pAggFunc->funcId)) { + struct TsmaOptRewriteCtx ctx = { + .pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0}; + nodesRewriteExpr(&pAggFuncNode, tsmaOptRewriter, &ctx); + if (ctx.code) { + code = ctx.code; + } else { + REPLACE_NODE(pAggFuncNode); + } + continue; + } + code = fmGetDistMethod(pAggFunc, &pPartial, &pMerge); + if (code) break; + + pColNode = (SColumnNode*)pScanListCell->pNode; + pScanListCell = pScanListCell->pNext; + pColNode->node.resType = pPartial->node.resType; + nodesListErase(pMerge->pParameterList, pMerge->pParameterList->pHead); + // TODO STRICT + nodesListPushFront(pMerge->pParameterList, nodesCloneNode((SNode*)pColNode)); + + nodesDestroyNode((SNode*)pPartial); + REPLACE_NODE(pMerge); + } + + if (code == TSDB_CODE_SUCCESS && pWindow) { + SColumnNode* pCol = (SColumnNode*)pScan->pScanCols->pTail->pNode; + assert(pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID); + nodesDestroyNode(pWindow->pTspk); + pWindow->pTspk = nodesCloneNode((SNode*)pCol); + } + + if (code == TSDB_CODE_SUCCESS) { + nodesDestroyList(pScan->node.pTargets); + code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets); + } + if (code == TSDB_CODE_SUCCESS) { + code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets); + } + + return code; +} + static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan, const STSMAOptUsefulTsma* pTsma) { SNode * pStateFuncNode, *pAggFuncNode; @@ -6360,6 +6422,7 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SNodeList* pAggStateFuncs = NULL; SNodeList* pAggFuncs = NULL; SWindowLogicNode* pWindow = NULL; + SAggLogicNode* pAgg = NULL; bool isFirstMergeNode = pTsmaOptCtx->pScan == pScan; bool hasWStart = false; @@ -6367,7 +6430,8 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, pWindow = (SWindowLogicNode*)pParent; pAggFuncs = pWindow->pFuncs; } else { - pAggFuncs = ((SAggLogicNode*)pParent)->pAggFuncs; + pAgg = (SAggLogicNode*)pParent; + pAggFuncs = pAgg->pAggFuncs; } if (isFirstMergeNode) { pAggStateFuncs = nodesCloneList(pAggFuncs); @@ -6384,7 +6448,7 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode; if (fmIsGroupKeyFunc(pAggFunc->funcId)) { struct TsmaOptRewriteCtx ctx = { - .pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0}; + .pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0}; nodesRewriteExpr(&pAggFuncNode, tsmaOptRewriter, &ctx); if (ctx.code) { code = ctx.code; @@ -6426,6 +6490,11 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, code = createColumnByRewriteExprs(pAggFuncs, &pWindow->node.pTargets); } + if (code == TSDB_CODE_SUCCESS && pAgg) { + nodesDestroyList(pAgg->node.pTargets); + code = createColumnByRewriteExprs(pAggFuncs, &pAgg->node.pTargets); + } + if (code == TSDB_CODE_SUCCESS) { nodesDestroyList(pScan->node.pTargets); code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets); @@ -6449,8 +6518,6 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) { break; } pSubplan->subplanType = SUBPLAN_TYPE_SCAN; - // TODO what is 1? - //pSubplan->splitFlag = 1; pTsmaOptCtx->generatedSubPlans[i - 1] = pSubplan; SLogicNode* pParent = (SLogicNode*)nodesCloneNode((SNode*)pTsmaOptCtx->pParent); if (!pParent) { @@ -6463,7 +6530,7 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) { SScanLogicNode* pScan = (SScanLogicNode*)pParent->pChildren->pHead->pNode; code = tsmaOptRewriteScan(pTsmaOptCtx, pScan, pTsma); if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) { - code = tsmaOptRevisePlan(pTsmaOptCtx, pParent, pScan, pTsma); + code = tsmaOptRevisePlan2(pTsmaOptCtx, pParent, pScan, pTsma); } } @@ -6471,7 +6538,8 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) { pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, 0); code = tsmaOptRewriteScan(pTsmaOptCtx, pTsmaOptCtx->pScan, pTsma); if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) { - code = tsmaOptRevisePlan(pTsmaOptCtx, pTsmaOptCtx->pParent, pTsmaOptCtx->pScan, pTsma); } + code = tsmaOptRevisePlan2(pTsmaOptCtx, pTsmaOptCtx->pParent, pTsmaOptCtx->pScan, pTsma); + } } return code; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index e2a342f895..fe23025146 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -2335,6 +2335,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildre break; } } + if (code) break; } } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 841d66657b..2ee3a09044 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -741,6 +741,11 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo pSubplan->id.queryId = pCxt->queryId; pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT; splSetSubplanVgroups(pSubplan, pSubplan->pNode); + code = stbSplCreatePartWindowNode((SWindowLogicNode*)pSubplan->pNode, &pPartWindow); + if (TSDB_CODE_SUCCESS == code) { + nodesDestroyNode((SNode*)pSubplan->pNode); + pSubplan->pNode = pPartWindow; + } } pMerge->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1; pMerge->srcEndGroupId = pCxt->groupId; @@ -1116,17 +1121,71 @@ static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* p return code; } +static int32_t stbSplSplitAggNodeForCrossTableMulSubplan(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartAgg = NULL; + bool hasExchange = false; + SMergeLogicNode* pMergeNode = NULL; + SLogicSubplan* pFirstScanSubplan = NULL; + int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); + + if (TSDB_CODE_SUCCESS == code) { + if (pInfo->pSplitNode->forceCreateNonBlockingOptr) { + code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg); + } else { + hasExchange = true; + code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, NULL, pPartAgg, false, false); + } + pMergeNode = + (SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1); + } else { + nodesDestroyNode((SNode*)pPartAgg); + } + + if (code == TSDB_CODE_SUCCESS) { + pFirstScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT); + if (!pFirstScanSubplan) code = TSDB_CODE_OUT_OF_MEMORY; + } + + if (code == TSDB_CODE_SUCCESS) { + SNode* pNode; + FOREACH(pNode, pInfo->pSubplan->pChildren) { + ++(pCxt->groupId); + SLogicSubplan* pSubplan = (SLogicSubplan*)pNode; + pSubplan->id.groupId = pCxt->groupId; + pSubplan->id.queryId = pCxt->queryId; + pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT; + splSetSubplanVgroups(pSubplan, pSubplan->pNode); + code = stbSplCreatePartAggNode((SAggLogicNode*)pSubplan->pNode, &pPartAgg); + if (code) break; + nodesDestroyNode((SNode*)pSubplan->pNode); + pSubplan->pNode = pPartAgg; + } + pMergeNode->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1; + pMergeNode->srcEndGroupId = pCxt->groupId; + } + + if (code == TSDB_CODE_SUCCESS) { + code = nodesListMakeAppend(&pInfo->pSubplan->pChildren, (SNode*)pFirstScanSubplan); + } + + if (code && pFirstScanSubplan) { + nodesDestroyNode((SNode*)pFirstScanSubplan); + } + + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + ++(pCxt->groupId); + return code; +} + static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartAgg = NULL; int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); - bool hasExchange = false; if (TSDB_CODE_SUCCESS == code) { // if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg if (pInfo->pSplitNode->forceCreateNonBlockingOptr) code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg); //TODO test slimit else { code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg); - hasExchange = true; } } else { nodesDestroyNode((SNode*)pPartAgg); @@ -1138,18 +1197,6 @@ static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplit if (!pScanSubplan) code = TSDB_CODE_OUT_OF_MEMORY; } - if (code == TSDB_CODE_SUCCESS) { - SNode* pNode; - FOREACH(pNode, pInfo->pSubplan->pChildren) { - SLogicSubplan* pSubplan = (SLogicSubplan*)pNode; - pSubplan->id.groupId = pCxt->groupId; - pSubplan->id.queryId = pCxt->queryId; - pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT; - splSetSubplanVgroups(pSubplan, pSubplan->pNode); - code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pSubplan->pNode); - } - } - if (code == TSDB_CODE_SUCCESS) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)pScanSubplan); } @@ -1163,6 +1210,9 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) { return stbSplSplitAggNodeForPartTable(pCxt, pInfo); } + if (pInfo->pSubplan->pChildren && LIST_LENGTH(pInfo->pSubplan->pChildren) > 0) { + return stbSplSplitAggNodeForCrossTableMulSubplan(pCxt, pInfo); + } return stbSplSplitAggNodeForCrossTable(pCxt, pInfo); } diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 53befb0697..4b70a9eddd 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1,5 +1,4 @@ from random import randrange -from pandas import tseries import taos import time import threading @@ -27,8 +26,8 @@ class UsedTsma: def __init__(self) -> None: self.name = '' ## tsma name or table name - self.time_range_start = UsedTsma.TS_MIN - self.time_range_end = UsedTsma.TS_MAX + self.time_range_start: float = float(UsedTsma.TS_MIN) + self.time_range_end: float = float(UsedTsma.TS_MAX) self.is_tsma_ = False def __eq__(self, __value: object) -> bool: @@ -40,6 +39,15 @@ class UsedTsma: else: return False + def __ne__(self, __value: object) -> bool: + return not self.__eq__(__value) + + def __str__(self) -> str: + return "%s: from %s to %s is_tsma: %d" % (self.name, self.time_range_start, self.time_range_end, self.is_tsma_) + + def __repr__(self) -> str: + return self.__str__() + class TSMAQueryContext: def __init__(self) -> None: self.sql = '' @@ -56,6 +64,9 @@ class TSMAQueryContext: else: return False + def __ne__(self, __value: object) -> bool: + return self.__eq__(__value) + def __str__(self) -> str: return str(self.used_tsmas) @@ -75,12 +86,19 @@ class TSMAQueryContextBuilder: def with_sql(self, sql: str): self.ctx.sql = sql return self + + def to_timestamp(self, ts: str) -> float: + if ts == UsedTsma.TS_MAX or ts == UsedTsma.TS_MIN: + return float(ts) + tdSql.query("select to_timestamp('%s', 'yyyy-mm-dd hh24-mi-ss.ms')" % (ts)) + res = tdSql.queryResult[0][0] + return res.timestamp() * 1000 def should_query_with_table(self, tb_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder': used_tsma: UsedTsma = UsedTsma() used_tsma.name = tb_name - used_tsma.time_range_start = ts_begin - used_tsma.time_range_end = ts_end + used_tsma.time_range_start = self.to_timestamp(ts_begin) + used_tsma.time_range_end = self.to_timestamp(ts_end) used_tsma.is_tsma_ = False self.ctx.used_tsmas.append(used_tsma) return self @@ -88,8 +106,8 @@ class TSMAQueryContextBuilder: def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder': used_tsma: UsedTsma = UsedTsma() used_tsma.name = tsma_name + '_tsma_res_stb' - used_tsma.time_range_start = ts_begin - used_tsma.time_range_end = ts_end + used_tsma.time_range_start = self.to_timestamp(ts_begin) + used_tsma.time_range_end = self.to_timestamp(ts_end) used_tsma.is_tsma_ = True self.ctx.used_tsmas.append(used_tsma) return self @@ -129,11 +147,21 @@ class TSMATestContext: row = row[idx:].split('[')[1] row = row.split(']')[0] words = row.split(',') - used_tsma.time_range_start = words[0].strip() - used_tsma.time_range_end = words[1].strip() + used_tsma.time_range_start = float(words[0].strip()) + used_tsma.time_range_end = float(words[1].strip()) query_ctx.used_tsmas.append(used_tsma) used_tsma = UsedTsma() + deduplicated_tsmas: list[UsedTsma] = [] + if len(query_ctx.used_tsmas) > 0: + deduplicated_tsmas.append(query_ctx.used_tsmas[0]) + for tsma in query_ctx.used_tsmas: + if tsma == deduplicated_tsmas[-1]: + continue + else: + deduplicated_tsmas.append(tsma) + query_ctx.used_tsmas = deduplicated_tsmas + return query_ctx def check_explain(self, sql: str, expect: TSMAQueryContext): @@ -142,7 +170,21 @@ class TSMATestContext: tdLog.exit('check explain failed for sql: %s \nexpect: %s \nactual: %s' % (sql, str(expect), str(query_ctx))) def check_result(self, sql: str): - pass + #tdSql.execute("alter local 'querySmaOptimize' '1'") + tsma_res = tdSql.getResult(sql) + + tdSql.execute("alter local 'querySmaOptimize' '0'") + no_tsma_res = tdSql.getResult(sql) + + if no_tsma_res is None or tsma_res is None: + if no_tsma_res != tsma_res: + tdLog.exit("comparing tsma res for: %s got different rows of result: with tsma: %s, with tsma: %s" % (sql, no_tsma_res, tsma_res)) + + if len(no_tsma_res) != len(tsma_res): + tdLog.exit("comparing tsma res for: %s got differnt rows of result: without tsma: %d, with tsma: %d" % (sql, len(no_tsma_res), len(tsma_res))) + for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res): + if row_no_tsma != row_tsma: + tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s" % (sql, str(row_no_tsma), str(row_tsma))) def check_sql(self, sql: str, expect: TSMAQueryContext): self.check_explain(sql, expect=expect) @@ -274,74 +316,122 @@ class TDTestCase: break if not plan_found: tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(explain_output))) + + def check(self, func): + for ctx in func(): + self.test_ctx.check_sql(ctx.sql, ctx) def test_query_with_tsma(self): self.init_data() self.create_tsma('tsma1', 'test', 'meters', ['avg'], ['c1', 'c2'], '5m') self.create_tsma('tsma2', 'test', 'meters', ['avg'], ['c1', 'c2'], '30m') + time.sleep(5) time.sleep(9999999) self.test_query_with_tsma_interval() self.test_query_with_tsma_agg() def test_query_with_tsma_interval(self): - self.test_query_with_tsma_interval_no_partition() - self.test_query_with_tsma_interval_partition_by_col() - self.test_query_with_tsma_interval_partition_by_tbname() - self.test_query_with_tsma_interval_partition_by_tag() - self.test_query_with_tsma_interval_partition_by_hybrid() + self.check(self.test_query_with_tsma_interval_no_partition) + self.check(self.test_query_with_tsma_interval_partition_by_col) + self.check(self.test_query_with_tsma_interval_partition_by_tbname) + self.check(self.test_query_with_tsma_interval_partition_by_tag) + self.check(self.test_query_with_tsma_interval_partition_by_hybrid) - def test_query_with_tsma_interval_no_partition(self): - pass + def test_query_with_tsma_interval_no_partition(self) -> List[TSMAQueryContext]: + ctxs: List[TSMAQueryContext] = [] + sql = 'select avg(c1), avg(c2) from meters interval(5m)' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_tsma('tsma1', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx()) + + sql = 'select avg(c1), avg(c2) from meters interval(10m)' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_tsma('tsma1', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx()) + sql = 'select avg(c1), avg(c2) from meters interval(30m)' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx()) + sql = 'select avg(c1), avg(c2) from meters interval(60m)' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx()) + + sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m)" + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_table('meters', '2018-09-17 09:00:00.009','2018-09-17 09:29:59.999') \ + .should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \ + .should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.664').get_ctx()) + return ctxs def test_query_with_tsma_interval_partition_by_tbname(self): - pass + return [] def test_query_with_tsma_interval_partition_by_tag(self): - pass + return [] def test_query_with_tsma_interval_partition_by_col(self): - pass + return [] def test_query_with_tsma_interval_partition_by_hybrid(self): - pass + return [] def test_query_with_tsma_agg(self): - self.test_query_with_tsma_agg_no_group_by() - self.test_query_with_tsma_agg_group_by_hybrid() - self.test_query_with_tsma_agg_group_by_tbname() - self.test_query_with_tsma_agg_group_by_tag() + self.check(self.test_query_with_tsma_agg_no_group_by) + self.check(self.test_query_with_tsma_agg_group_by_hybrid) + self.check(self.test_query_with_tsma_agg_group_by_tbname) + self.check(self.test_query_with_tsma_agg_group_by_tag) def test_query_with_tsma_agg_no_group_by(self): ctxs: List[TSMAQueryContext] = [] - ctxs.append(TSMAQueryContextBuilder().with_sql('select avg(c1), avg(c2) from meters').should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx()) - ctxs.append(TSMAQueryContextBuilder().with_sql('select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"')\ - .should_query_with_table('meters', '','').get_ctx()) - tsma_sqls = [ - 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"' - 'select avg(c1) + avg(c2), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"' - 'select avg(c1) + avg(c2), avg(c2) + 1 from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"' - 'select avg(c1) + avg(c2), from meters where tbname like "%t1%"' - ] - non_tsma_sqls = [ - 'select avg(c1), avg(c2) from meters where c1 is not NULL', - 'select avg(c1), avg(c2), spread(c4) from meters', - ] + sql = 'select avg(c1), avg(c2) from meters' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql).should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx()) - for ctx in ctxs: - self.test_ctx.check_sql(ctx.sql, ctx) + sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.000" and "2018-09-17 10:00:00.000"' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_tsma('tsma2', '2018-09-17 09:00:00','2018-09-17 09:59:59:999') \ + .should_query_with_table("meters", '2018-09-17 10:00:00','2018-09-17 10:00:00').get_ctx()) + + sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \ + .should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \ + .should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_ctx()) + + sql = 'select avg(c1) + avg(c2), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \ + .should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \ + .should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_ctx()) + + sql = 'select avg(c1) + avg(c2), avg(c2) + 1 from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \ + .should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \ + .should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_ctx()) + + sql = 'select avg(c1) + avg(c2) from meters where tbname like "%t1%"' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx()) + + sql = 'select avg(c1), avg(c2) from meters where c1 is not NULL' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx()) + + sql = 'select avg(c1), avg(c2), spread(c4) from meters' + ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \ + .should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx()) + + return ctxs def test_query_with_tsma_agg_group_by_tbname(self): - pass + return [] def test_query_with_tsma_agg_group_by_tag(self): - pass + return [] def test_query_with_tsma_agg_group_by_hybrid(self): - pass + return [] def run(self): self.test_query_with_tsma() - time.sleep(999999) + #time.sleep(999999) def stop(self): tdSql.close()