Merge pull request #14219 from taosdata/feature/3.0_debug_wxy
feat: support partition by expression and aggregate function output together
This commit is contained in:
commit
9c85131d22
|
@ -75,6 +75,7 @@ typedef struct SScanLogicNode {
|
|||
double filesFactor;
|
||||
SArray* pSmaIndexes;
|
||||
SNodeList* pPartTags;
|
||||
bool partSort;
|
||||
} SScanLogicNode;
|
||||
|
||||
typedef struct SJoinLogicNode {
|
||||
|
|
|
@ -1274,13 +1274,12 @@ static bool validateTimestampDigits(const SValueNode* pVal) {
|
|||
}
|
||||
|
||||
int64_t tsVal = pVal->datum.i;
|
||||
char fraction[20] = {0};
|
||||
char fraction[20] = {0};
|
||||
NUM_TO_STRING(pVal->node.resType.type, &tsVal, sizeof(fraction), fraction);
|
||||
int32_t tsDigits = (int32_t)strlen(fraction);
|
||||
|
||||
if (tsDigits > TSDB_TIME_PRECISION_SEC_DIGITS) {
|
||||
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS ||
|
||||
tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS ||
|
||||
if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS || tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS ||
|
||||
tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
|
||||
return true;
|
||||
} else {
|
||||
|
@ -1510,6 +1509,11 @@ static int32_t translateBlockDistInfoFunc(SFunctionNode* pFunc, char* pErrBuf, i
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateGroupKeyFunc(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
pFunc->node.resType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(STableBlockDistInfo);
|
||||
return true;
|
||||
|
@ -2519,6 +2523,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.initFunc = functionSetup,
|
||||
.processFunc = groupKeyFunction,
|
||||
.finalizeFunc = groupKeyFinalize,
|
||||
.pPartialFunc = "_group_key",
|
||||
.pMergeFunc = "_group_key"
|
||||
},
|
||||
};
|
||||
// clang-format on
|
||||
|
|
|
@ -346,9 +346,11 @@ static void destroyVgDataBlockArray(SArray* pArray) {
|
|||
}
|
||||
|
||||
static void destroyLogicNode(SLogicNode* pNode) {
|
||||
nodesDestroyList(pNode->pChildren);
|
||||
nodesDestroyNode(pNode->pConditions);
|
||||
nodesDestroyList(pNode->pTargets);
|
||||
nodesDestroyNode(pNode->pConditions);
|
||||
nodesDestroyList(pNode->pChildren);
|
||||
nodesDestroyNode(pNode->pLimit);
|
||||
nodesDestroyNode(pNode->pSlimit);
|
||||
}
|
||||
|
||||
static void destroyPhysiNode(SPhysiNode* pNode) {
|
||||
|
@ -368,6 +370,7 @@ static void destroyWinodwPhysiNode(SWinodwPhysiNode* pNode) {
|
|||
static void destroyScanPhysiNode(SScanPhysiNode* pNode) {
|
||||
destroyPhysiNode((SPhysiNode*)pNode);
|
||||
nodesDestroyList(pNode->pScanCols);
|
||||
nodesDestroyList(pNode->pScanPseudoCols);
|
||||
}
|
||||
|
||||
static void destroyDataSinkNode(SDataSinkNode* pNode) { nodesDestroyNode((SNode*)pNode->pInputDataBlockDesc); }
|
||||
|
@ -516,6 +519,9 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
nodesDestroyNode(pStmt->pWindow);
|
||||
nodesDestroyList(pStmt->pGroupByList);
|
||||
nodesDestroyNode(pStmt->pHaving);
|
||||
nodesDestroyNode(pStmt->pRange);
|
||||
nodesDestroyNode(pStmt->pEvery);
|
||||
nodesDestroyNode(pStmt->pFill);
|
||||
nodesDestroyList(pStmt->pOrderByList);
|
||||
nodesDestroyNode((SNode*)pStmt->pLimit);
|
||||
nodesDestroyNode((SNode*)pStmt->pSlimit);
|
||||
|
@ -779,6 +785,8 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
SInterpFuncLogicNode* pLogicNode = (SInterpFuncLogicNode*)pNode;
|
||||
destroyLogicNode((SLogicNode*)pLogicNode);
|
||||
nodesDestroyList(pLogicNode->pFuncs);
|
||||
nodesDestroyNode(pLogicNode->pFillValues);
|
||||
nodesDestroyNode(pLogicNode->pTimeSeries);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_SUBPLAN: {
|
||||
|
@ -793,14 +801,21 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
nodesDestroyList(((SQueryLogicPlan*)pNode)->pTopSubplans);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
|
||||
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: {
|
||||
STableScanPhysiNode* pPhyNode = (STableScanPhysiNode*)pNode;
|
||||
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
||||
nodesDestroyList(pPhyNode->pDynamicScanFuncs);
|
||||
nodesDestroyList(pPhyNode->pPartitionTags);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
|
||||
SProjectPhysiNode* pPhyNode = (SProjectPhysiNode*)pNode;
|
||||
destroyPhysiNode((SPhysiNode*)pPhyNode);
|
||||
|
@ -891,6 +906,8 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
destroyPhysiNode((SPhysiNode*)pPhyNode);
|
||||
nodesDestroyList(pPhyNode->pExprs);
|
||||
nodesDestroyList(pPhyNode->pFuncs);
|
||||
nodesDestroyNode(pPhyNode->pFillValues);
|
||||
nodesDestroyNode(pPhyNode->pTimeSeries);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
|
|
|
@ -1355,6 +1355,25 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode
|
|||
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
|
||||
if (NULL == pFunc) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
|
||||
strcpy(pFunc->functionName, "_group_key");
|
||||
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
|
||||
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||
*pNode = (SNode*)pFunc;
|
||||
pCxt->errCode = fmGetFuncInfo(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len);
|
||||
}
|
||||
pCxt->pCurrSelectStmt->hasAggFuncs = true;
|
||||
|
||||
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
|
||||
}
|
||||
|
||||
static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
|
||||
SCheckExprForGroupByCxt* pCxt = (SCheckExprForGroupByCxt*)pContext;
|
||||
if (!nodesIsExprNode(*pNode) || isAliasColumn(*pNode)) {
|
||||
|
@ -1371,10 +1390,10 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
|
|||
if (isAggFunc(*pNode) && !isDistinctOrderBy(pCxt->pTranslateCxt)) {
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
SNode* pGroupNode;
|
||||
SNode* pGroupNode = NULL;
|
||||
FOREACH(pGroupNode, getGroupByList(pCxt->pTranslateCxt)) {
|
||||
if (nodesEqualNode(getGroupByNode(pGroupNode), *pNode)) {
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode);
|
||||
}
|
||||
}
|
||||
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||
|
@ -1432,6 +1451,25 @@ static int32_t rewriteColsToSelectValFunc(STranslateContext* pCxt, SSelectStmt*
|
|||
return pCxt->errCode;
|
||||
}
|
||||
|
||||
static EDealRes rewriteExprsToGroupKeyFuncImpl(SNode** pNode, void* pContext) {
|
||||
STranslateContext* pCxt = pContext;
|
||||
SNode* pPartKey = NULL;
|
||||
FOREACH(pPartKey, pCxt->pCurrSelectStmt->pPartitionByList) {
|
||||
if (nodesEqualNode(pPartKey, *pNode)) {
|
||||
return rewriteExprToGroupKeyFunc(pCxt, pNode);
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t rewriteExprsToGroupKeyFunc(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
nodesRewriteExprs(pSelect->pProjectionList, rewriteExprsToGroupKeyFuncImpl, pCxt);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->errCode && !pSelect->isDistinct) {
|
||||
nodesRewriteExprs(pSelect->pOrderByList, rewriteExprsToGroupKeyFuncImpl, pCxt);
|
||||
}
|
||||
return pCxt->errCode;
|
||||
}
|
||||
|
||||
typedef struct CheckAggColCoexistCxt {
|
||||
STranslateContext* pTranslateCxt;
|
||||
bool existAggFunc;
|
||||
|
@ -1456,6 +1494,12 @@ static EDealRes doCheckAggColCoexist(SNode* pNode, void* pContext) {
|
|||
pCxt->existIndefiniteRowsFunc = true;
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
SNode* pPartKey = NULL;
|
||||
FOREACH(pPartKey, pCxt->pTranslateCxt->pCurrSelectStmt->pPartitionByList) {
|
||||
if (nodesEqualNode(pPartKey, pNode)) {
|
||||
return DEAL_RES_IGNORE_CHILD;
|
||||
}
|
||||
}
|
||||
if (isScanPseudoColumnFunc(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
pCxt->existCol = true;
|
||||
}
|
||||
|
@ -1485,6 +1529,9 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
|
|||
if (cxt.existIndefiniteRowsFunc && cxt.existCol) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
||||
}
|
||||
if (cxt.existAggFunc && NULL != pSelect->pPartitionByList) {
|
||||
return rewriteExprsToGroupKeyFunc(pCxt, pSelect);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1840,10 +1887,7 @@ static int32_t createMultiResFuncsFromStar(STranslateContext* pCxt, SFunctionNod
|
|||
code = createMultiResFuncs(pSrcFunc, pExprs, pOutput);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyList(pExprs);
|
||||
}
|
||||
|
||||
nodesDestroyList(pExprs);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2463,8 +2507,24 @@ static SNode* createOrderByExpr(STranslateContext* pCxt) {
|
|||
return (SNode*)pOrder;
|
||||
}
|
||||
|
||||
// from: select tail(expr, k, f) from t where_clause partition_by_clause order_by_clause ...
|
||||
// to: select expr from t where_clause order by _rowts desc limit k offset f
|
||||
/* case 1:
|
||||
* in: select tail(expr, k, f) from t where_clause
|
||||
* out: select expr from t where_clause order by _rowts desc limit k offset f
|
||||
*
|
||||
* case 2:
|
||||
* in: select tail(expr, k, f) from t where_clause partition_by_clause
|
||||
* out: select expr from t where_clause partition_by_clause sort by _rowts desc limit k offset f
|
||||
*
|
||||
* case 3:
|
||||
* in: select tail(expr, k, f) from t where_clause order_by_clause limit_clause
|
||||
* out: select expr from (
|
||||
* select expr from t where_clause order by _rowts desc limit k offset f
|
||||
* ) order_by_clause limit_clause
|
||||
*
|
||||
* case 4:
|
||||
* in: select tail(expr, k, f) from t where_clause partition_by_clause limit_clause
|
||||
* out:
|
||||
*/
|
||||
static int32_t rewriteTailStmt(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
if (!pSelect->hasTailFunc) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -6018,7 +6078,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg);
|
||||
pQuery->msgType = pQuery->pCmdMsg->msgType;
|
||||
}
|
||||
break;
|
||||
break;
|
||||
default:
|
||||
pQuery->execMode = QUERY_EXEC_MODE_RPC;
|
||||
if (NULL != pCxt->pCmdMsg) {
|
||||
|
|
|
@ -199,6 +199,20 @@ TEST_F(ParserSelectTest, tailFuncSemanticCheck) {
|
|||
run("SELECT TAIL(c1, 10) FROM t1 GROUP BY c2", TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC);
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, partitionBy) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT c1, c2 FROM t1 PARTITION BY c2");
|
||||
|
||||
run("SELECT SUM(c1), c2 FROM t1 PARTITION BY c2");
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, partitionBySemanticCheck) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT SUM(c1), c2, c3 FROM t1 PARTITION BY c2", TSDB_CODE_PAR_NOT_SINGLE_GROUP);
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, groupBy) {
|
||||
useDb("root", "test");
|
||||
|
||||
|
@ -213,6 +227,15 @@ TEST_F(ParserSelectTest, groupBy) {
|
|||
run("SELECT COUNT(*), c1 + 10, c2 cnt FROM t1 WHERE c1 > 0 GROUP BY c1 + 10, c2");
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, groupBySemanticCheck) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT COUNT(*) cnt, c1 FROM t1 WHERE c1 > 0", TSDB_CODE_PAR_NOT_SINGLE_GROUP);
|
||||
run("SELECT COUNT(*) cnt, c2 FROM t1 WHERE c1 > 0 GROUP BY c1", TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION);
|
||||
run("SELECT COUNT(*) cnt, c2 FROM t1 WHERE c1 > 0 PARTITION BY c2 GROUP BY c1",
|
||||
TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION);
|
||||
}
|
||||
|
||||
TEST_F(ParserSelectTest, orderBy) {
|
||||
useDb("root", "test");
|
||||
|
||||
|
|
|
@ -450,6 +450,15 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
// set grouyp keys, agg funcs and having conditions
|
||||
if (TSDB_CODE_SUCCESS == code && pSelect->hasAggFuncs) {
|
||||
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsAggFunc, &pAgg->pAggFuncs);
|
||||
}
|
||||
|
||||
// rewrite the expression in subsequent clauses
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||
}
|
||||
|
||||
if (NULL != pSelect->pGroupByList) {
|
||||
pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList);
|
||||
if (NULL == pAgg->pGroupKeys) {
|
||||
|
@ -462,15 +471,6 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
code = rewriteExprsForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && pSelect->hasAggFuncs) {
|
||||
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsAggFunc, &pAgg->pAggFuncs);
|
||||
}
|
||||
|
||||
// rewrite the expression in subsequent clauses
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
|
||||
pAgg->node.pConditions = nodesCloneNode(pSelect->pHaving);
|
||||
if (NULL == pAgg->node.pConditions) {
|
||||
|
@ -780,8 +780,8 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pProject->node.pLimit = (SNode*)pSelect->pLimit;
|
||||
pProject->node.pSlimit = (SNode*)pSelect->pSlimit;
|
||||
TSWAP(pProject->node.pLimit, pSelect->pLimit);
|
||||
TSWAP(pProject->node.pSlimit, pSelect->pSlimit);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -940,7 +940,7 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pSort->node.pLimit = pSetOperator->pLimit;
|
||||
TSWAP(pSort->node.pLimit, pSetOperator->pLimit);
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -973,7 +973,7 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator
|
|||
}
|
||||
|
||||
if (NULL == pSetOperator->pOrderByList) {
|
||||
pProject->node.pLimit = pSetOperator->pLimit;
|
||||
TSWAP(pProject->node.pLimit, pSetOperator->pLimit);
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1004,7 +1004,7 @@ static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pS
|
|||
}
|
||||
|
||||
if (NULL == pSetOperator->pOrderByList) {
|
||||
pAgg->node.pLimit = pSetOperator->pLimit;
|
||||
TSWAP(pAgg->node.pSlimit, pSetOperator->pLimit);
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -682,7 +682,7 @@ static EOrder opkGetPrimaryKeyOrder(SSortLogicNode* pSort) {
|
|||
static SNode* opkRewriteDownNode(SSortLogicNode* pSort) {
|
||||
SNode* pDownNode = nodesListGetNode(pSort->node.pChildren, 0);
|
||||
// todo
|
||||
pSort->node.pChildren = NULL;
|
||||
NODES_CLEAR_LIST(pSort->node.pChildren);
|
||||
return pDownNode;
|
||||
}
|
||||
|
||||
|
|
|
@ -348,8 +348,8 @@ static SPhysiNode* makePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pPhysiNode->pLimit = pLogicNode->pLimit;
|
||||
pPhysiNode->pSlimit = pLogicNode->pSlimit;
|
||||
TSWAP(pPhysiNode->pLimit, pLogicNode->pLimit);
|
||||
TSWAP(pPhysiNode->pSlimit, pLogicNode->pSlimit);
|
||||
|
||||
int32_t code = createDataBlockDesc(pCxt, pLogicNode->pTargets, &pPhysiNode->pOutputDataBlockDesc);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
@ -862,6 +862,9 @@ static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
|
|||
nodesDestroyNode((SNode*)pIdfRowsFunc);
|
||||
}
|
||||
|
||||
nodesDestroyList(pPrecalcExprs);
|
||||
nodesDestroyList(pFuncs);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -913,6 +916,9 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
|
|||
nodesDestroyNode((SNode*)pInterpFunc);
|
||||
}
|
||||
|
||||
nodesDestroyList(pPrecalcExprs);
|
||||
nodesDestroyList(pFuncs);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1048,6 +1054,9 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
|
|||
nodesDestroyNode((SNode*)pWindow);
|
||||
}
|
||||
|
||||
nodesDestroyList(pPrecalcExprs);
|
||||
nodesDestroyList(pFuncs);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1241,6 +1250,9 @@ static int32_t createPartitionPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
|
|||
nodesDestroyNode((SNode*)pPart);
|
||||
}
|
||||
|
||||
nodesDestroyList(pPrecalcExprs);
|
||||
nodesDestroyList(pPartitionKeys);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -179,7 +179,7 @@ static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) {
|
|||
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (WINDOW_TYPE_STATE == pWindow->winType) {
|
||||
if (!streamQuery) {
|
||||
return stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
|
@ -374,7 +374,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
|
|||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
|
||||
// NULL == pSubplan means 'merge node' replaces 'split node'.
|
||||
// NULL != pSubplan means 'merge node' replaces 'split node'.
|
||||
if (NULL == pSubplan) {
|
||||
pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets);
|
||||
} else {
|
||||
|
@ -512,7 +512,7 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
|
|||
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0);
|
||||
|
||||
SNodeList* pMergeKeys = NULL;
|
||||
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys);
|
||||
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild);
|
||||
|
@ -561,6 +561,8 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
|||
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
return ((SScanLogicNode*)pNode)->pPartTags;
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -571,14 +573,15 @@ static bool stbSplIsPartTbanme(SNodeList* pPartKeys) {
|
|||
return false;
|
||||
}
|
||||
SNode* pPartKey = nodesListGetNode(pPartKeys, 0);
|
||||
return QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType;
|
||||
return (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) ||
|
||||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TBNAME == ((SColumnNode*)pPartKey)->colType);
|
||||
}
|
||||
|
||||
static bool stbSplIsMultiTableWinodw(SWindowLogicNode* pWindow) {
|
||||
static bool stbSplIsPartTableWinodw(SWindowLogicNode* pWindow) {
|
||||
return stbSplIsPartTbanme(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
switch (((SWindowLogicNode*)pInfo->pSplitNode)->winType) {
|
||||
case WINDOW_TYPE_INTERVAL:
|
||||
return stbSplSplitInterval(pCxt, pInfo);
|
||||
|
@ -592,7 +595,7 @@ static int32_t stbSplSplitWindowForMergeTable(SSplitContext* pCxt, SStableSplitI
|
|||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitWindowForMultiTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
if (pCxt->pPlanCxt->streamQuery) {
|
||||
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -613,10 +616,10 @@ static int32_t stbSplSplitWindowForMultiTable(SSplitContext* pCxt, SStableSplitI
|
|||
}
|
||||
|
||||
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
if (stbSplIsMultiTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
|
||||
return stbSplSplitWindowForMultiTable(pCxt, pInfo);
|
||||
if (stbSplIsPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
|
||||
return stbSplSplitWindowForPartTable(pCxt, pInfo);
|
||||
} else {
|
||||
return stbSplSplitWindowForMergeTable(pCxt, pInfo);
|
||||
return stbSplSplitWindowForCrossTable(pCxt, pInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -40,3 +40,9 @@ TEST_F(PlanDistinctTest, withOrderBy) {
|
|||
|
||||
run("select distinct c1 + 10 a from t1 order by a");
|
||||
}
|
||||
|
||||
TEST_F(PlanDistinctTest, withLimit) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT DISTINCT c1 FROM t1 LIMIT 3");
|
||||
}
|
||||
|
|
|
@ -53,14 +53,6 @@ TEST_F(PlanGroupByTest, aggFunc) {
|
|||
run("SELECT SUM(10), COUNT(c1) FROM t1 GROUP BY c2");
|
||||
}
|
||||
|
||||
TEST_F(PlanGroupByTest, rewriteFunc) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT AVG(c1) FROM t1");
|
||||
|
||||
run("SELECT AVG(c1) FROM t1 GROUP BY c2");
|
||||
}
|
||||
|
||||
TEST_F(PlanGroupByTest, selectFunc) {
|
||||
useDb("root", "test");
|
||||
|
||||
|
@ -81,6 +73,8 @@ TEST_F(PlanGroupByTest, stable) {
|
|||
|
||||
run("SELECT COUNT(*) FROM st1");
|
||||
|
||||
run("SELECT c1 FROM st1 GROUP BY c1");
|
||||
|
||||
run("SELECT COUNT(*) FROM st1 GROUP BY c1");
|
||||
|
||||
run("SELECT COUNT(*) FROM st1 PARTITION BY c2 GROUP BY c1");
|
||||
|
|
|
@ -57,9 +57,15 @@ TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
|
|||
TEST_F(PlanOptimizeTest, PartitionTags) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT c1 FROM st1 PARTITION BY tag1");
|
||||
run("SELECT c1, tag1 FROM st1 PARTITION BY tag1");
|
||||
|
||||
run("SELECT SUM(c1) FROM st1 GROUP BY tag1");
|
||||
run("SELECT SUM(c1), tag1 FROM st1 PARTITION BY tag1");
|
||||
|
||||
run("SELECT SUM(c1), tag1 + 10 FROM st1 PARTITION BY tag1 + 10");
|
||||
|
||||
run("SELECT SUM(c1), tag1 FROM st1 GROUP BY tag1");
|
||||
|
||||
run("SELECT SUM(c1), tag1 + 10 FROM st1 GROUP BY tag1 + 10");
|
||||
}
|
||||
|
||||
TEST_F(PlanOptimizeTest, eliminateProjection) {
|
||||
|
|
|
@ -34,6 +34,8 @@ TEST_F(PlanPartitionByTest, withAggFunc) {
|
|||
useDb("root", "test");
|
||||
|
||||
run("select count(*) from t1 partition by c1");
|
||||
|
||||
run("select count(*), c1 from t1 partition by c1");
|
||||
}
|
||||
|
||||
TEST_F(PlanPartitionByTest, withInterval) {
|
||||
|
@ -41,8 +43,12 @@ TEST_F(PlanPartitionByTest, withInterval) {
|
|||
|
||||
// normal/child table
|
||||
run("select count(*) from t1 partition by c1 interval(10s)");
|
||||
|
||||
run("select count(*), c1 from t1 partition by c1 interval(10s)");
|
||||
// super table
|
||||
run("select count(*) from st1 partition by tag1, tag2 interval(10s)");
|
||||
|
||||
run("select count(*), tag1 from st1 partition by tag1, tag2 interval(10s)");
|
||||
}
|
||||
|
||||
TEST_F(PlanPartitionByTest, withGroupBy) {
|
||||
|
|
Loading…
Reference in New Issue