test tsma

This commit is contained in:
wangjiaming0909 2024-01-23 15:38:47 +08:00
parent 403f743301
commit 1509ce04cf
5 changed files with 377 additions and 167 deletions

View File

@ -283,6 +283,7 @@ SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) {
continue;
}
break;
pNonSortMerge->lastSourceIdx = idx - 1;
}
if (!pBlock) {

View File

@ -818,7 +818,7 @@ static bool pdcJoinIsPrimEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
tSimpleHashCleanup(pLeftTables);
tSimpleHashCleanup(pRightTables);
return res;
}
@ -1186,14 +1186,14 @@ static int32_t pdcJoinAddPreFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogi
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pCondCols, &pTargets);
}
nodesDestroyList(pCondCols);
if (TSDB_CODE_SUCCESS == code) {
SNode* pNode = NULL;
FOREACH(pNode, pTargets) {
SNode* pTmp = NULL;
bool found = false;
bool found = false;
FOREACH(pTmp, pJoin->node.pTargets) {
if (nodesEqualNode(pTmp, pNode)) {
found = true;
@ -1204,7 +1204,7 @@ static int32_t pdcJoinAddPreFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogi
nodesListStrictAppend(pJoin->node.pTargets, nodesCloneNode(pNode));
}
}
}
}
nodesDestroyList(pTargets);
@ -2917,8 +2917,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
scanPathOptSetGroupOrderScan(pScan);
pParent->hasGroupKeyOptimized = true;
}
if (pNode->pParent->pSlimit)
pScan->groupOrderScan = true;
if (pNode->pParent->pSlimit) pScan->groupOrderScan = true;
NODES_CLEAR_LIST(pNode->pChildren);
nodesDestroyNode((SNode*)pNode);
@ -3395,7 +3394,7 @@ static SNode* rewriteUniqueOptCreateFirstFunc(SFunctionNode* pSelectValue, SNode
strcpy(pFunc->node.aliasName, pSelectValue->node.aliasName);
} else {
int64_t pointer = (int64_t)pFunc;
char name[TSDB_FUNC_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
char name[TSDB_FUNC_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64 "", pFunc->functionName, pointer);
taosCreateMD5Hash(name, len);
strncpy(pFunc->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
@ -3603,8 +3602,8 @@ static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast
/// @param lastColId only used when lastColNum equals 1, the col id of the only one last col
/// @param selectNonPKColNum num of normal cols
/// @param selectNonPKColId only used when selectNonPKColNum equals 1, the col id of the only one select col
static bool lastRowScanOptCheckColNum(int32_t lastColNum, col_id_t lastColId,
int32_t selectNonPKColNum, col_id_t selectNonPKColId) {
static bool lastRowScanOptCheckColNum(int32_t lastColNum, col_id_t lastColId, int32_t selectNonPKColNum,
col_id_t selectNonPKColId) {
// multi select non pk col + last func: select c1, c2, last(c1)
if (selectNonPKColNum > 1 && lastColNum > 0) return false;
@ -3678,8 +3677,7 @@ static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, int8_t cacheLastModel
} else if (lastColNum > 0) {
return false;
}
if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId))
return false;
if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId)) return false;
} else if (FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
if (!lastRowScanOptLastParaIsTag(nodesListGetNode(pAggFunc->pParameterList, 0))) {
return false;
@ -3720,7 +3718,7 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
if (!lastRowScanOptCheckLastCache(pAgg, pScan)) {
return false;
}
bool hasOtherFunc = false;
if (!lastRowScanOptCheckFuncList(pNode, pScan->cacheLastMode, &hasOtherFunc)) {
return false;
@ -3947,7 +3945,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
} else {
pNode = nodesListGetNode(pFunc->pParameterList, 0);
nodesListMakeAppend(&cxt.pOtherCols, pNode);
if (FUNCTION_TYPE_SELECT_VALUE == funcType) {
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
SColumnNode* pCol = (SColumnNode*)pNode;
@ -3973,7 +3971,8 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pPKTsCol));
}
if (pNonPKCol && cxt.pLastCols->length == 1 && nodesEqualNode((SNode*)pNonPKCol, nodesListGetNode(cxt.pLastCols, 0))) {
if (pNonPKCol && cxt.pLastCols->length == 1 &&
nodesEqualNode((SNode*)pNonPKCol, nodesListGetNode(cxt.pLastCols, 0))) {
// when select last(c1), c1 from ..., we add c1 to targets
sprintf(pNonPKCol->colName, "#sel_val.%p", pNonPKCol);
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pNonPKCol));
@ -3990,7 +3989,6 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
return TSDB_CODE_SUCCESS;
}
static bool splitCacheLastFuncOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
@ -4015,7 +4013,8 @@ static bool splitCacheLastFuncOptMayBeOptimized(SLogicNode* pNode) {
return true;
}
static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, SAggLogicNode* pAgg, SNodeList* pFunc, SNodeList* pTargets) {
static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, SAggLogicNode* pAgg, SNodeList* pFunc,
SNodeList* pTargets) {
SAggLogicNode* pNew = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
if (NULL == pNew) {
nodesDestroyList(pFunc);
@ -4051,10 +4050,10 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg,
pNew->node.pChildren = nodesCloneList(pAgg->node.pChildren);
int32_t code = 0;
SNode* pNode = nodesListGetNode(pNew->node.pChildren, 0);
SNode* pNode = nodesListGetNode(pNew->node.pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
SNodeList* pOldScanCols = NULL;
SNodeList* pOldScanCols = NULL;
TSWAP(pScan->pScanCols, pOldScanCols);
nodesDestroyList(pScan->pScanPseudoCols);
pScan->pScanPseudoCols = NULL;
@ -4110,7 +4109,8 @@ static int32_t splitCacheLastFuncOptModifyAggLogicNode(SAggLogicNode* pAgg) {
return TSDB_CODE_SUCCESS;
}
static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew, SAggLogicNode* pAgg1, SAggLogicNode* pAgg2) {
static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew, SAggLogicNode* pAgg1,
SAggLogicNode* pAgg2) {
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -4127,17 +4127,13 @@ static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew,
nodesDestroyNode(pNewAgg2);
return TSDB_CODE_OUT_OF_MEMORY;
}
((SAggLogicNode*)pNewAgg1)->node.pParent = (SLogicNode*)pMerge;
((SAggLogicNode*)pNewAgg2)->node.pParent = (SLogicNode*)pMerge;
SNode* pNode = NULL;
FOREACH(pNode, ((SAggLogicNode*)pNewAgg1)->node.pChildren) {
((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg1;
}
FOREACH(pNode, ((SAggLogicNode*)pNewAgg2)->node.pChildren) {
((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg2;
}
FOREACH(pNode, ((SAggLogicNode*)pNewAgg1)->node.pChildren) { ((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg1; }
FOREACH(pNode, ((SAggLogicNode*)pNewAgg2)->node.pChildren) { ((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg2; }
int32_t code = nodesListMakeStrictAppendList(&pMerge->node.pTargets, nodesCloneList(pAgg1->node.pTargets));
if (TSDB_CODE_SUCCESS == code) {
@ -4157,7 +4153,7 @@ static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew,
} else {
*pNew = pMerge;
}
return code;
}
@ -4203,8 +4199,8 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan*
{
WHERE_EACH(pNode, pAgg->node.pTargets) {
SColumnNode* pCol = (SColumnNode*)pNode;
SNode* pFuncNode = NULL;
bool found = false;
SNode* pFuncNode = NULL;
bool found = false;
FOREACH(pFuncNode, pAggFuncList) {
SFunctionNode* pFunc = (SFunctionNode*)pFuncNode;
if (0 == strcmp(pFunc->node.aliasName, pCol->colName)) {
@ -4217,7 +4213,7 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan*
ERASE_NODE(pAgg->node.pTargets);
continue;
}
WHERE_NEXT;
WHERE_NEXT;
}
}
@ -4228,8 +4224,8 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan*
}
SMergeLogicNode* pMerge = NULL;
SAggLogicNode* pNewAgg = NULL;
int32_t code = splitCacheLastFuncOptCreateAggLogicNode(&pNewAgg, pAgg, pAggFuncList, pTargets);
SAggLogicNode* pNewAgg = NULL;
int32_t code = splitCacheLastFuncOptCreateAggLogicNode(&pNewAgg, pAgg, pAggFuncList, pTargets);
if (TSDB_CODE_SUCCESS == code) {
code = splitCacheLastFuncOptModifyAggLogicNode(pAgg);
}
@ -4240,19 +4236,17 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan*
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pAgg, (SLogicNode*)pMerge);
}
nodesDestroyNode((SNode *)pAgg);
nodesDestroyNode((SNode *)pNewAgg);
nodesDestroyNode((SNode*)pAgg);
nodesDestroyNode((SNode*)pNewAgg);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode *)pMerge);
nodesDestroyNode((SNode*)pMerge);
}
pCxt->optimized = true;
return code;
}
// merge projects
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
@ -4405,7 +4399,7 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
}
pScanNode->onlyMetaCtbIdx = false;
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
}
@ -4449,8 +4443,8 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu
// The scan below will do scanning with group order
return cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_SLIMIT);
}
// else if not part by tag and tbname, the partition node below indicates that results are sorted, the agg node can
// be pipelined.
// else if not part by tag and tbname, the partition node below indicates that results are sorted, the agg node
// can be pipelined.
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && LIST_LENGTH(pNodeLimitPushTo->pChildren) == 1) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNodeLimitPushTo->pChildren, 0);
if (nodeType(pChild) == QUERY_NODE_LOGIC_PLAN_PARTITION) {
@ -5121,19 +5115,16 @@ static bool stbJoinOptShouldBeOptimized(SLogicNode* pNode) {
return true;
}
int32_t stbJoinOptAddFuncToScanNode(char* funcName, SScanLogicNode* pScan) {
SFunctionNode* pUidFunc = createFunction(funcName, NULL);
snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p",
pUidFunc->functionName, pUidFunc);
int32_t code = nodesListStrictAppend(pScan->pScanPseudoCols, (SNode *)pUidFunc);
snprintf(pUidFunc->node.aliasName, sizeof(pUidFunc->node.aliasName), "%s.%p", pUidFunc->functionName, pUidFunc);
int32_t code = nodesListStrictAppend(pScan->pScanPseudoCols, (SNode*)pUidFunc);
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExpr((SNode*)pUidFunc, &pScan->node.pTargets);
}
return code;
}
int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
SJoinLogicNode* pJoinNode = (SJoinLogicNode*)pJoin;
@ -5146,14 +5137,14 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) {
pScan->onlyMetaCtbIdx = true;
SNodeList* pTags = nodesMakeList();
int32_t code = nodesCollectColumnsFromNode(pJoinNode->pTagEqCond, NULL, COLLECT_COL_TYPE_TAG, &pTags);
int32_t code = nodesCollectColumnsFromNode(pJoinNode->pTagEqCond, NULL, COLLECT_COL_TYPE_TAG, &pTags);
if (TSDB_CODE_SUCCESS == code) {
code = nodesCollectColumnsFromNode(pJoinNode->pTagOnCond, NULL, COLLECT_COL_TYPE_TAG, &pTags);
}
if (TSDB_CODE_SUCCESS == code) {
SNode* pTarget = NULL;
SNode* pTag = NULL;
bool found = false;
bool found = false;
WHERE_EACH(pTarget, pScan->node.pTargets) {
found = false;
SColumnNode* pTargetCol = (SColumnNode*)pTarget;
@ -5170,7 +5161,7 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) {
WHERE_NEXT;
}
}
}
}
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptAddFuncToScanNode("_tbuid", pScan);
}
@ -5181,7 +5172,7 @@ int32_t stbJoinOptRewriteToTagScan(SLogicNode* pJoin, SNode* pNode) {
if (code) {
nodesDestroyList(pTags);
}
return code;
}
@ -5192,7 +5183,7 @@ static int32_t stbJoinOptCreateTagScanNode(SLogicNode* pJoin, SNodeList** ppList
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
code = stbJoinOptRewriteToTagScan(pJoin, pNode);
if (code) {
@ -5234,9 +5225,10 @@ static int32_t stbJoinOptCreateTagHashJoinNode(SLogicNode* pOrig, SNodeList* pCh
SNode* pNode = NULL;
FOREACH(pNode, pChildren) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
SNode* pCol = NULL;
SNode* pCol = NULL;
FOREACH(pCol, pScan->pScanPseudoCols) {
if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) {
if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID ||
((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) {
code = createColumnByRewriteExpr(pCol, &pJoin->node.pTargets);
if (code) {
break;
@ -5279,7 +5271,7 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL
int32_t code = TSDB_CODE_SUCCESS;
int32_t i = 0;
SNode* pNode = NULL;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
//code = stbJoinOptAddFuncToScanNode("_tbuid", pScan);
@ -5294,7 +5286,7 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL
pScan->node.dynamicOp = true;
*(srcScan + i++) = pScan->pVgroupList->numOfVgroups <= 1;
pScan->scanType = SCAN_TYPE_TABLE;
}
@ -5304,13 +5296,13 @@ static int32_t stbJoinOptCreateTableScanNodes(SLogicNode* pJoin, SNodeList** ppL
}
static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChildren, SLogicNode** ppLogic) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheLogicNode* pGrpCache = (SGroupCacheLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_GROUP_CACHE);
if (NULL == pGrpCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}
//pGrpCache->node.dynamicOp = true;
// pGrpCache->node.dynamicOp = true;
pGrpCache->grpColsMayBeNull = false;
pGrpCache->grpByUid = true;
pGrpCache->batchFetch = getBatchScanOptionFromHint(pRoot->pHint);
@ -5325,9 +5317,10 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChi
}
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pChildren, 0);
SNode* pCol = NULL;
SNode* pCol = NULL;
FOREACH(pCol, pScan->pScanPseudoCols) {
if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID || ((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) {
if (QUERY_NODE_FUNCTION == nodeType(pCol) && (((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_TBUID ||
((SFunctionNode*)pCol)->funcType == FUNCTION_TYPE_VGID)) {
code = createColumnByRewriteExpr(pCol, &pGrpCache->pGroupCols);
if (code) {
break;
@ -5335,7 +5328,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChi
}
}
bool hasCond = false;
bool hasCond = false;
SNode* pNode = NULL;
FOREACH(pNode, pChildren) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
@ -5345,7 +5338,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SLogicNode* pRoot, SNodeList* pChi
pScan->node.pParent = (SLogicNode*)pGrpCache;
}
pGrpCache->globalGrp = false;
if (TSDB_CODE_SUCCESS == code) {
*ppLogic = (SLogicNode*)pGrpCache;
} else {
@ -5397,16 +5390,14 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi
}
pJoin->joinAlgo = JOIN_ALGO_MERGE;
//pJoin->node.dynamicOp = true;
// pJoin->node.dynamicOp = true;
stbJoinOptRemoveTagEqCond(pJoin);
NODES_DESTORY_NODE(pJoin->pTagEqCond);
SNode* pNode = NULL;
FOREACH(pNode, pJoin->node.pChildren) {
ERASE_NODE(pJoin->node.pChildren);
}
int32_t code = nodesListStrictAppend(pJoin->node.pChildren, (SNode *)pChild);
FOREACH(pNode, pJoin->node.pChildren) { ERASE_NODE(pJoin->node.pChildren); }
int32_t code = nodesListStrictAppend(pJoin->node.pChildren, (SNode*)pChild);
if (TSDB_CODE_SUCCESS == code) {
pChild->pParent = (SLogicNode*)pJoin;
*ppLogic = (SLogicNode*)pJoin;
@ -5418,8 +5409,9 @@ static int32_t stbJoinOptCreateMergeJoinNode(SLogicNode* pOrig, SLogicNode* pChi
return code;
}
static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pRoot, SLogicNode* pPrev, SLogicNode* pPost, bool* srcScan, SLogicNode** ppDynNode) {
int32_t code = TSDB_CODE_SUCCESS;
static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pRoot, SLogicNode* pPrev, SLogicNode* pPost, bool* srcScan,
SLogicNode** ppDynNode) {
int32_t code = TSDB_CODE_SUCCESS;
SDynQueryCtrlLogicNode* pDynCtrl = (SDynQueryCtrlLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL);
if (NULL == pDynCtrl) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -5428,22 +5420,22 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pRoot, SLogicNode* p
pDynCtrl->qType = DYN_QTYPE_STB_HASH;
pDynCtrl->stbJoin.batchFetch = getBatchScanOptionFromHint(pRoot->pHint);
memcpy(pDynCtrl->stbJoin.srcScan, srcScan, sizeof(pDynCtrl->stbJoin.srcScan));
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code) {
pDynCtrl->node.pChildren = nodesMakeList();
if (NULL == pDynCtrl->node.pChildren) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code) {
pDynCtrl->stbJoin.pVgList = nodesMakeList();
if (NULL == pDynCtrl->stbJoin.pVgList) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
if (TSDB_CODE_SUCCESS == code) {
pDynCtrl->stbJoin.pUidList = nodesMakeList();
if (NULL == pDynCtrl->stbJoin.pUidList) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -5472,7 +5464,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pRoot, SLogicNode* p
if (TSDB_CODE_SUCCESS == code) {
pPrev->pParent = (SLogicNode*)pDynCtrl;
pPost->pParent = (SLogicNode*)pDynCtrl;
*ppDynNode = (SLogicNode*)pDynCtrl;
} else {
nodesDestroyNode((SNode*)pDynCtrl);
@ -5488,9 +5480,9 @@ static int32_t stbJoinOptRewriteStableJoin(SOptimizeContext* pCxt, SLogicNode* p
SLogicNode* pGrpCacheNode = NULL;
SLogicNode* pHJoinNode = NULL;
SLogicNode* pMJoinNode = NULL;
SLogicNode* pDynNode = NULL;
SLogicNode* pDynNode = NULL;
bool srcScan[2] = {0};
int32_t code = stbJoinOptCreateTagScanNode(pJoin, &pTagScanNodes);
if (TSDB_CODE_SUCCESS == code) {
code = stbJoinOptCreateTagHashJoinNode(pJoin, pTagScanNodes, &pHJoinNode);
@ -5642,8 +5634,8 @@ static bool partColOptShouldBeOptimized(SLogicNode* pNode) {
}
static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
SNode* node;
int32_t code = TSDB_CODE_SUCCESS;
SNode* node;
int32_t code = TSDB_CODE_SUCCESS;
SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT);
if (pSort) {
bool alreadyPartByPKTs = false;
@ -5703,7 +5695,6 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
if (NULL == pNode) return TSDB_CODE_SUCCESS;
SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pNode);
if (pRootNode->pHint && getSortForGroupOptHint(pRootNode->pHint)) {
// replace with sort node
SSortLogicNode* pSort = partColOptCreateSort(pNode);
@ -5802,6 +5793,7 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
FOREACH(pTmpNode, pFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
// TODO test other pseudo column funcs
// TODO test funcs with multi params
if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) &&
!fmIsGroupKeyFunc(pFunc->funcId)) {
return false;
@ -5814,20 +5806,20 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) {
}
typedef struct STSMAOptUsefulTsma {
const STableTSMAInfo* pTsma; // NULL if no tsma available, which will use original data for calculation
STimeWindow scanRange; // scan time range for this tsma
SArray* pTsmaScanCols; // SArray<int32_t> index of tsmaFuncs array
const STableTSMAInfo* pTsma; // NULL if no tsma available, which will use original data for calculation
STimeWindow scanRange; // scan time range for this tsma
SArray* pTsmaScanCols; // SArray<int32_t> index of tsmaFuncs array
} STSMAOptUsefulTsma;
typedef struct STSMAOptCtx {
// input
SScanLogicNode* pScan;
SLogicNode* pParent; // parent of Table Scan, Agg or Interval
const SNodeList* pAggFuncs;
const STimeWindow* pTimeRange;
const SArray* pTsmas;
SInterval* queryInterval; // not null with window logic node
int8_t precision;
SScanLogicNode* pScan;
SLogicNode* pParent; // parent of Table Scan, Agg or Interval
const SNodeList* pAggFuncs;
const STimeWindow* pTimeRange;
const SArray* pTsmas;
SInterval* queryInterval; // not null with window logic node
int8_t precision;
// output
SArray* pUsefulTsmas; // SArray<STSMAOptUseFulTsma>, sorted by tsma interval from long to short
@ -5835,7 +5827,7 @@ typedef struct STSMAOptCtx {
SLogicSubplan* generatedSubPlans[2];
} STSMAOptCtx;
static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) {
static int32_t fillTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) {
int32_t code = 0;
pTsmaOptCtx->pScan = pScan;
pTsmaOptCtx->pParent = pScan->node.pParent;
@ -5939,7 +5931,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ
if (queryColId > pTsmaFuncInfo->colId) {
continue;
}
found= true;
found = true;
taosArrayPush(pTsmaScanCols, &j);
break;
}
@ -5983,9 +5975,9 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
}
static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) {
const STSMAOptUsefulTsma* p = pLeft, *q = pRight;
int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval;
int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI);
const STSMAOptUsefulTsma *p = pLeft, *q = pRight;
int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval;
int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI);
ASSERT(code == TSDB_CODE_SUCCESS);
code = getDuration(qInterval, q->pTsma->unit, &qInterval, TSDB_TIME_PRECISION_MILLI);
ASSERT(code == TSDB_CODE_SUCCESS);
@ -5994,7 +5986,8 @@ static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRigh
return 0;
}
static const STSMAOptUsefulTsma* tsmaOptFindUsefulTsma(const SArray* pUsefulTsmas, int32_t startIdx, int64_t alignInterval, int8_t precision) {
static const STSMAOptUsefulTsma* tsmaOptFindUsefulTsma(const SArray* pUsefulTsmas, int32_t startIdx,
int64_t alignInterval, int8_t precision) {
int64_t tsmaInterval;
for (int32_t i = startIdx; i < pUsefulTsmas->size; ++i) {
const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pUsefulTsmas, i);
@ -6074,7 +6067,8 @@ static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pSc
// the main tsma
if (endOfSkeyFirstWin <= startOfEkeyFirstWin) {
scanRange.ekey = TMIN(pScanRange->ekey, startOfEkeyFirstWin - 1);
scanRange.ekey =
TMIN(pScanRange->ekey, endOfSkeyFirstWin == startOfEkeyFirstWin ? pScanRange->ekey : startOfEkeyFirstWin - 1);
if (!isSkeyAlignedWithTsma) {
scanRange.skey = endOfSkeyFirstWin;
}
@ -6139,7 +6133,8 @@ SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNod
return pScanCols;
}
static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma, SColumnNode* pTagCol) {
static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma,
SColumnNode* pTagCol) {
bool found = false;
if (pTagCol->colType != COLUMN_TYPE_TAG) return 0;
for (int32_t i = 0; i < pTsma->pTsma->pTags->size; ++i) {
@ -6160,7 +6155,7 @@ static int32_t tsmaOptRewriteTag(const STSMAOptCtx* pTsmaOptCtx, const STSMAOptU
static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbNameNode,
const STSMAOptUsefulTsma* pTsma) {
int32_t code = 0;
int32_t code = 0;
SFunctionNode* pRewrittenFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
SValueNode* pValue = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
if (!pRewrittenFunc || !pValue) code = TSDB_CODE_OUT_OF_MEMORY;
@ -6186,7 +6181,7 @@ static int32_t tsmaOptRewriteTbname(const STSMAOptCtx* pTsmaOptCtx, SNode** pTbN
if (code == TSDB_CODE_SUCCESS) {
code = nodesListAppend(pRewrittenFunc->pParameterList, (SNode*)pValue);
}
} else if (code == TSDB_CODE_SUCCESS){
} else if (code == TSDB_CODE_SUCCESS) {
// if no tsma, we replace func tbname with concat('', tbname)
pRewrittenFunc->funcId = fmGetFuncId("concat");
snprintf(pRewrittenFunc->functionName, TSDB_FUNC_NAME_LEN, "concat('', tbname)");
@ -6284,7 +6279,7 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
tstrncpy(pPkTsCol->tableAlias, pTsma->pTsma->targetTb, TSDB_TABLE_NAME_LEN);
pPkTsCol->tableId = pTsma->pTsma->destTbUid;
pPkTsCol->tableType = TSDB_SUPER_TABLE;
nodesListMakeAppend(&pNewScan->pScanCols, (SNode*)pPkTsCol);
nodesListMakeStrictAppend(&pNewScan->pScanCols, nodesCloneNode((SNode*)pPkTsCol));
}
if (code == TSDB_CODE_SUCCESS) {
pNewScan->stableId = pTsma->pTsma->destTbUid;
@ -6336,7 +6331,7 @@ static int32_t tsmaOptCreateWStart(int8_t precision, SFunctionNode** pWStartOut)
}
strcpy(pWStart->functionName, "_wstart");
int64_t pointer = (int64_t)pWStart;
char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
char name[TSDB_COL_NAME_LEN + TSDB_POINTER_PRINT_BYTES + TSDB_NAME_DELIMITER_LEN + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%" PRId64 "", pWStart->functionName, pointer);
taosCreateMD5Hash(name, len);
strncpy(pWStart->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
@ -6351,6 +6346,73 @@ static int32_t tsmaOptCreateWStart(int8_t precision, SFunctionNode** pWStartOut)
return code;
}
static int32_t tsmaOptRevisePlan2(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan,
const STSMAOptUsefulTsma* pTsma) {
int32_t code = 0;
SColumnNode* pColNode;
SWindowLogicNode* pWindow;
SAggLogicNode* pAgg;
SNodeList* pAggFuncs;
SListCell* pScanListCell;
SNode* pAggFuncNode;
SNodeList* pAggStateFuncs = NULL;
bool isFirstMergeNode = pTsmaOptCtx->pScan == pScan;
SFunctionNode * pPartial = NULL, *pMerge = NULL;
if (nodeType(pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) {
pWindow = (SWindowLogicNode*)pParent;
pAggFuncs = pWindow->pFuncs;
} else {
pAgg = (SAggLogicNode*)pParent;
pAggFuncs = pAgg->pAggFuncs;
}
pScanListCell = pScan->pScanCols->pHead;
FOREACH(pAggFuncNode, pAggFuncs) {
SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode;
if (fmIsGroupKeyFunc(pAggFunc->funcId)) {
struct TsmaOptRewriteCtx ctx = {
.pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0};
nodesRewriteExpr(&pAggFuncNode, tsmaOptRewriter, &ctx);
if (ctx.code) {
code = ctx.code;
} else {
REPLACE_NODE(pAggFuncNode);
}
continue;
}
code = fmGetDistMethod(pAggFunc, &pPartial, &pMerge);
if (code) break;
pColNode = (SColumnNode*)pScanListCell->pNode;
pScanListCell = pScanListCell->pNext;
pColNode->node.resType = pPartial->node.resType;
nodesListErase(pMerge->pParameterList, pMerge->pParameterList->pHead);
// TODO STRICT
nodesListPushFront(pMerge->pParameterList, nodesCloneNode((SNode*)pColNode));
nodesDestroyNode((SNode*)pPartial);
REPLACE_NODE(pMerge);
}
if (code == TSDB_CODE_SUCCESS && pWindow) {
SColumnNode* pCol = (SColumnNode*)pScan->pScanCols->pTail->pNode;
assert(pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
nodesDestroyNode(pWindow->pTspk);
pWindow->pTspk = nodesCloneNode((SNode*)pCol);
}
if (code == TSDB_CODE_SUCCESS) {
nodesDestroyList(pScan->node.pTargets);
code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets);
}
if (code == TSDB_CODE_SUCCESS) {
code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets);
}
return code;
}
static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan,
const STSMAOptUsefulTsma* pTsma) {
SNode * pStateFuncNode, *pAggFuncNode;
@ -6360,6 +6422,7 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
SNodeList* pAggStateFuncs = NULL;
SNodeList* pAggFuncs = NULL;
SWindowLogicNode* pWindow = NULL;
SAggLogicNode* pAgg = NULL;
bool isFirstMergeNode = pTsmaOptCtx->pScan == pScan;
bool hasWStart = false;
@ -6367,7 +6430,8 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
pWindow = (SWindowLogicNode*)pParent;
pAggFuncs = pWindow->pFuncs;
} else {
pAggFuncs = ((SAggLogicNode*)pParent)->pAggFuncs;
pAgg = (SAggLogicNode*)pParent;
pAggFuncs = pAgg->pAggFuncs;
}
if (isFirstMergeNode) {
pAggStateFuncs = nodesCloneList(pAggFuncs);
@ -6384,7 +6448,7 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode;
if (fmIsGroupKeyFunc(pAggFunc->funcId)) {
struct TsmaOptRewriteCtx ctx = {
.pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0};
.pTsmaOptCtx = pTsmaOptCtx, .pTsma = pTsma, .rewriteTag = true, .rewriteTbname = true, .code = 0};
nodesRewriteExpr(&pAggFuncNode, tsmaOptRewriter, &ctx);
if (ctx.code) {
code = ctx.code;
@ -6426,6 +6490,11 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
code = createColumnByRewriteExprs(pAggFuncs, &pWindow->node.pTargets);
}
if (code == TSDB_CODE_SUCCESS && pAgg) {
nodesDestroyList(pAgg->node.pTargets);
code = createColumnByRewriteExprs(pAggFuncs, &pAgg->node.pTargets);
}
if (code == TSDB_CODE_SUCCESS) {
nodesDestroyList(pScan->node.pTargets);
code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets);
@ -6449,8 +6518,6 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
break;
}
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
// TODO what is 1?
//pSubplan->splitFlag = 1;
pTsmaOptCtx->generatedSubPlans[i - 1] = pSubplan;
SLogicNode* pParent = (SLogicNode*)nodesCloneNode((SNode*)pTsmaOptCtx->pParent);
if (!pParent) {
@ -6463,7 +6530,7 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
SScanLogicNode* pScan = (SScanLogicNode*)pParent->pChildren->pHead->pNode;
code = tsmaOptRewriteScan(pTsmaOptCtx, pScan, pTsma);
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
code = tsmaOptRevisePlan(pTsmaOptCtx, pParent, pScan, pTsma);
code = tsmaOptRevisePlan2(pTsmaOptCtx, pParent, pScan, pTsma);
}
}
@ -6471,7 +6538,8 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, 0);
code = tsmaOptRewriteScan(pTsmaOptCtx, pTsmaOptCtx->pScan, pTsma);
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
code = tsmaOptRevisePlan(pTsmaOptCtx, pTsmaOptCtx->pParent, pTsmaOptCtx->pScan, pTsma); }
code = tsmaOptRevisePlan2(pTsmaOptCtx, pTsmaOptCtx->pParent, pTsmaOptCtx->pScan, pTsma);
}
}
return code;

View File

@ -2335,6 +2335,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildre
break;
}
}
if (code) break;
}
}

View File

@ -741,6 +741,11 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
pSubplan->id.queryId = pCxt->queryId;
pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
splSetSubplanVgroups(pSubplan, pSubplan->pNode);
code = stbSplCreatePartWindowNode((SWindowLogicNode*)pSubplan->pNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pSubplan->pNode);
pSubplan->pNode = pPartWindow;
}
}
pMerge->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
pMerge->srcEndGroupId = pCxt->groupId;
@ -1116,17 +1121,71 @@ static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* p
return code;
}
static int32_t stbSplSplitAggNodeForCrossTableMulSubplan(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartAgg = NULL;
bool hasExchange = false;
SMergeLogicNode* pMergeNode = NULL;
SLogicSubplan* pFirstScanSubplan = NULL;
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
if (TSDB_CODE_SUCCESS == code) {
if (pInfo->pSplitNode->forceCreateNonBlockingOptr) {
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg);
} else {
hasExchange = true;
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, NULL, pPartAgg, false, false);
}
pMergeNode =
(SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
} else {
nodesDestroyNode((SNode*)pPartAgg);
}
if (code == TSDB_CODE_SUCCESS) {
pFirstScanSubplan = splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT);
if (!pFirstScanSubplan) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
SNode* pNode;
FOREACH(pNode, pInfo->pSubplan->pChildren) {
++(pCxt->groupId);
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
pSubplan->id.groupId = pCxt->groupId;
pSubplan->id.queryId = pCxt->queryId;
pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
splSetSubplanVgroups(pSubplan, pSubplan->pNode);
code = stbSplCreatePartAggNode((SAggLogicNode*)pSubplan->pNode, &pPartAgg);
if (code) break;
nodesDestroyNode((SNode*)pSubplan->pNode);
pSubplan->pNode = pPartAgg;
}
pMergeNode->numOfSubplans = LIST_LENGTH(pInfo->pSubplan->pChildren) + 1;
pMergeNode->srcEndGroupId = pCxt->groupId;
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListMakeAppend(&pInfo->pSubplan->pChildren, (SNode*)pFirstScanSubplan);
}
if (code && pFirstScanSubplan) {
nodesDestroyNode((SNode*)pFirstScanSubplan);
}
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
++(pCxt->groupId);
return code;
}
static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartAgg = NULL;
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
bool hasExchange = false;
if (TSDB_CODE_SUCCESS == code) {
// if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
if (pInfo->pSplitNode->forceCreateNonBlockingOptr)
code = stbSplAggNodeCreateMerge(pCxt, pInfo, pPartAgg); //TODO test slimit
else {
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
hasExchange = true;
}
} else {
nodesDestroyNode((SNode*)pPartAgg);
@ -1138,18 +1197,6 @@ static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplit
if (!pScanSubplan) code = TSDB_CODE_OUT_OF_MEMORY;
}
if (code == TSDB_CODE_SUCCESS) {
SNode* pNode;
FOREACH(pNode, pInfo->pSubplan->pChildren) {
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
pSubplan->id.groupId = pCxt->groupId;
pSubplan->id.queryId = pCxt->queryId;
pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
splSetSubplanVgroups(pSubplan, pSubplan->pNode);
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pSubplan->pNode);
}
}
if (code == TSDB_CODE_SUCCESS) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)pScanSubplan);
}
@ -1163,6 +1210,9 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
if (isPartTableAgg((SAggLogicNode*)pInfo->pSplitNode)) {
return stbSplSplitAggNodeForPartTable(pCxt, pInfo);
}
if (pInfo->pSubplan->pChildren && LIST_LENGTH(pInfo->pSubplan->pChildren) > 0) {
return stbSplSplitAggNodeForCrossTableMulSubplan(pCxt, pInfo);
}
return stbSplSplitAggNodeForCrossTable(pCxt, pInfo);
}

View File

@ -1,5 +1,4 @@
from random import randrange
from pandas import tseries
import taos
import time
import threading
@ -27,8 +26,8 @@ class UsedTsma:
def __init__(self) -> None:
self.name = '' ## tsma name or table name
self.time_range_start = UsedTsma.TS_MIN
self.time_range_end = UsedTsma.TS_MAX
self.time_range_start: float = float(UsedTsma.TS_MIN)
self.time_range_end: float = float(UsedTsma.TS_MAX)
self.is_tsma_ = False
def __eq__(self, __value: object) -> bool:
@ -40,6 +39,15 @@ class UsedTsma:
else:
return False
def __ne__(self, __value: object) -> bool:
return not self.__eq__(__value)
def __str__(self) -> str:
return "%s: from %s to %s is_tsma: %d" % (self.name, self.time_range_start, self.time_range_end, self.is_tsma_)
def __repr__(self) -> str:
return self.__str__()
class TSMAQueryContext:
def __init__(self) -> None:
self.sql = ''
@ -56,6 +64,9 @@ class TSMAQueryContext:
else:
return False
def __ne__(self, __value: object) -> bool:
return self.__eq__(__value)
def __str__(self) -> str:
return str(self.used_tsmas)
@ -75,12 +86,19 @@ class TSMAQueryContextBuilder:
def with_sql(self, sql: str):
self.ctx.sql = sql
return self
def to_timestamp(self, ts: str) -> float:
if ts == UsedTsma.TS_MAX or ts == UsedTsma.TS_MIN:
return float(ts)
tdSql.query("select to_timestamp('%s', 'yyyy-mm-dd hh24-mi-ss.ms')" % (ts))
res = tdSql.queryResult[0][0]
return res.timestamp() * 1000
def should_query_with_table(self, tb_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder':
used_tsma: UsedTsma = UsedTsma()
used_tsma.name = tb_name
used_tsma.time_range_start = ts_begin
used_tsma.time_range_end = ts_end
used_tsma.time_range_start = self.to_timestamp(ts_begin)
used_tsma.time_range_end = self.to_timestamp(ts_end)
used_tsma.is_tsma_ = False
self.ctx.used_tsmas.append(used_tsma)
return self
@ -88,8 +106,8 @@ class TSMAQueryContextBuilder:
def should_query_with_tsma(self, tsma_name: str, ts_begin: str, ts_end: str) -> 'TSMAQueryContextBuilder':
used_tsma: UsedTsma = UsedTsma()
used_tsma.name = tsma_name + '_tsma_res_stb'
used_tsma.time_range_start = ts_begin
used_tsma.time_range_end = ts_end
used_tsma.time_range_start = self.to_timestamp(ts_begin)
used_tsma.time_range_end = self.to_timestamp(ts_end)
used_tsma.is_tsma_ = True
self.ctx.used_tsmas.append(used_tsma)
return self
@ -129,11 +147,21 @@ class TSMATestContext:
row = row[idx:].split('[')[1]
row = row.split(']')[0]
words = row.split(',')
used_tsma.time_range_start = words[0].strip()
used_tsma.time_range_end = words[1].strip()
used_tsma.time_range_start = float(words[0].strip())
used_tsma.time_range_end = float(words[1].strip())
query_ctx.used_tsmas.append(used_tsma)
used_tsma = UsedTsma()
deduplicated_tsmas: list[UsedTsma] = []
if len(query_ctx.used_tsmas) > 0:
deduplicated_tsmas.append(query_ctx.used_tsmas[0])
for tsma in query_ctx.used_tsmas:
if tsma == deduplicated_tsmas[-1]:
continue
else:
deduplicated_tsmas.append(tsma)
query_ctx.used_tsmas = deduplicated_tsmas
return query_ctx
def check_explain(self, sql: str, expect: TSMAQueryContext):
@ -142,7 +170,21 @@ class TSMATestContext:
tdLog.exit('check explain failed for sql: %s \nexpect: %s \nactual: %s' % (sql, str(expect), str(query_ctx)))
def check_result(self, sql: str):
pass
#tdSql.execute("alter local 'querySmaOptimize' '1'")
tsma_res = tdSql.getResult(sql)
tdSql.execute("alter local 'querySmaOptimize' '0'")
no_tsma_res = tdSql.getResult(sql)
if no_tsma_res is None or tsma_res is None:
if no_tsma_res != tsma_res:
tdLog.exit("comparing tsma res for: %s got different rows of result: with tsma: %s, with tsma: %s" % (sql, no_tsma_res, tsma_res))
if len(no_tsma_res) != len(tsma_res):
tdLog.exit("comparing tsma res for: %s got differnt rows of result: without tsma: %d, with tsma: %d" % (sql, len(no_tsma_res), len(tsma_res)))
for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res):
if row_no_tsma != row_tsma:
tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s" % (sql, str(row_no_tsma), str(row_tsma)))
def check_sql(self, sql: str, expect: TSMAQueryContext):
self.check_explain(sql, expect=expect)
@ -274,74 +316,122 @@ class TDTestCase:
break
if not plan_found:
tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(explain_output)))
def check(self, func):
for ctx in func():
self.test_ctx.check_sql(ctx.sql, ctx)
def test_query_with_tsma(self):
self.init_data()
self.create_tsma('tsma1', 'test', 'meters', ['avg'], ['c1', 'c2'], '5m')
self.create_tsma('tsma2', 'test', 'meters', ['avg'], ['c1', 'c2'], '30m')
time.sleep(5)
time.sleep(9999999)
self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg()
def test_query_with_tsma_interval(self):
self.test_query_with_tsma_interval_no_partition()
self.test_query_with_tsma_interval_partition_by_col()
self.test_query_with_tsma_interval_partition_by_tbname()
self.test_query_with_tsma_interval_partition_by_tag()
self.test_query_with_tsma_interval_partition_by_hybrid()
self.check(self.test_query_with_tsma_interval_no_partition)
self.check(self.test_query_with_tsma_interval_partition_by_col)
self.check(self.test_query_with_tsma_interval_partition_by_tbname)
self.check(self.test_query_with_tsma_interval_partition_by_tag)
self.check(self.test_query_with_tsma_interval_partition_by_hybrid)
def test_query_with_tsma_interval_no_partition(self):
pass
def test_query_with_tsma_interval_no_partition(self) -> List[TSMAQueryContext]:
ctxs: List[TSMAQueryContext] = []
sql = 'select avg(c1), avg(c2) from meters interval(5m)'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_tsma('tsma1', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
sql = 'select avg(c1), avg(c2) from meters interval(10m)'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_tsma('tsma1', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
sql = 'select avg(c1), avg(c2) from meters interval(30m)'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
sql = 'select avg(c1), avg(c2) from meters interval(60m)'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
sql = "select avg(c1), avg(c2) from meters where ts >= '2018-09-17 09:00:00.009' and ts < '2018-09-17 10:23:19.665' interval(30m)"
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_table('meters', '2018-09-17 09:00:00.009','2018-09-17 09:29:59.999') \
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.664').get_ctx())
return ctxs
def test_query_with_tsma_interval_partition_by_tbname(self):
pass
return []
def test_query_with_tsma_interval_partition_by_tag(self):
pass
return []
def test_query_with_tsma_interval_partition_by_col(self):
pass
return []
def test_query_with_tsma_interval_partition_by_hybrid(self):
pass
return []
def test_query_with_tsma_agg(self):
self.test_query_with_tsma_agg_no_group_by()
self.test_query_with_tsma_agg_group_by_hybrid()
self.test_query_with_tsma_agg_group_by_tbname()
self.test_query_with_tsma_agg_group_by_tag()
self.check(self.test_query_with_tsma_agg_no_group_by)
self.check(self.test_query_with_tsma_agg_group_by_hybrid)
self.check(self.test_query_with_tsma_agg_group_by_tbname)
self.check(self.test_query_with_tsma_agg_group_by_tag)
def test_query_with_tsma_agg_no_group_by(self):
ctxs: List[TSMAQueryContext] = []
ctxs.append(TSMAQueryContextBuilder().with_sql('select avg(c1), avg(c2) from meters').should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
ctxs.append(TSMAQueryContextBuilder().with_sql('select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"')\
.should_query_with_table('meters', '','').get_ctx())
tsma_sqls = [
'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
'select avg(c1) + avg(c2), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
'select avg(c1) + avg(c2), avg(c2) + 1 from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
'select avg(c1) + avg(c2), from meters where tbname like "%t1%"'
]
non_tsma_sqls = [
'select avg(c1), avg(c2) from meters where c1 is not NULL',
'select avg(c1), avg(c2), spread(c4) from meters',
]
sql = 'select avg(c1), avg(c2) from meters'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql).should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
for ctx in ctxs:
self.test_ctx.check_sql(ctx.sql, ctx)
sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.000" and "2018-09-17 10:00:00.000"'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_tsma('tsma2', '2018-09-17 09:00:00','2018-09-17 09:59:59:999') \
.should_query_with_table("meters", '2018-09-17 10:00:00','2018-09-17 10:00:00').get_ctx())
sql = 'select avg(c1), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_ctx())
sql = 'select avg(c1) + avg(c2), avg(c2) from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_ctx())
sql = 'select avg(c1) + avg(c2), avg(c2) + 1 from meters where ts between "2018-09-17 09:00:00.200" and "2018-09-17 10:23:19.800"'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_table('meters', '2018-09-17 09:00:00.200','2018-09-17 09:29:59:999') \
.should_query_with_tsma('tsma2', '2018-09-17 09:30:00','2018-09-17 09:59:59.999') \
.should_query_with_table('meters', '2018-09-17 10:00:00.000','2018-09-17 10:23:19.800').get_ctx())
sql = 'select avg(c1) + avg(c2) from meters where tbname like "%t1%"'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_tsma('tsma2', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
sql = 'select avg(c1), avg(c2) from meters where c1 is not NULL'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
sql = 'select avg(c1), avg(c2), spread(c4) from meters'
ctxs.append(TSMAQueryContextBuilder().with_sql(sql) \
.should_query_with_table('meters', UsedTsma.TS_MIN,UsedTsma.TS_MAX).get_ctx())
return ctxs
def test_query_with_tsma_agg_group_by_tbname(self):
pass
return []
def test_query_with_tsma_agg_group_by_tag(self):
pass
return []
def test_query_with_tsma_agg_group_by_hybrid(self):
pass
return []
def run(self):
self.test_query_with_tsma()
time.sleep(999999)
#time.sleep(999999)
def stop(self):
tdSql.close()