feat: the last_row function supports all scenes
This commit is contained in:
parent
38ae79e6e0
commit
cf35174d53
|
@ -125,6 +125,7 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function
|
FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function
|
||||||
FUNCTION_TYPE_TO_COLUMN,
|
FUNCTION_TYPE_TO_COLUMN,
|
||||||
FUNCTION_TYPE_GROUP_KEY,
|
FUNCTION_TYPE_GROUP_KEY,
|
||||||
|
FUNCTION_TYPE_CACHE_LAST_ROW,
|
||||||
|
|
||||||
// distributed splitting functions
|
// distributed splitting functions
|
||||||
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
|
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
|
||||||
|
|
|
@ -91,6 +91,7 @@ typedef struct SAggLogicNode {
|
||||||
SLogicNode node;
|
SLogicNode node;
|
||||||
SNodeList* pGroupKeys;
|
SNodeList* pGroupKeys;
|
||||||
SNodeList* pAggFuncs;
|
SNodeList* pAggFuncs;
|
||||||
|
bool hasLastRow;
|
||||||
} SAggLogicNode;
|
} SAggLogicNode;
|
||||||
|
|
||||||
typedef struct SProjectLogicNode {
|
typedef struct SProjectLogicNode {
|
||||||
|
|
|
@ -1882,6 +1882,19 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.name = "last_row",
|
.name = "last_row",
|
||||||
.type = FUNCTION_TYPE_LAST_ROW,
|
.type = FUNCTION_TYPE_LAST_ROW,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
|
.translateFunc = translateFirstLast,
|
||||||
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
|
.initFunc = functionSetup,
|
||||||
|
.processFunc = lastFunction,
|
||||||
|
.finalizeFunc = firstLastFinalize,
|
||||||
|
.pPartialFunc = "_last_partial",
|
||||||
|
.pMergeFunc = "_last_merge",
|
||||||
|
.combineFunc = lastCombine,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_cache_last_row",
|
||||||
|
.type = FUNCTION_TYPE_CACHE_LAST_ROW,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
.translateFunc = translateLastRow,
|
.translateFunc = translateLastRow,
|
||||||
.getEnvFunc = getMinmaxFuncEnv,
|
.getEnvFunc = getMinmaxFuncEnv,
|
||||||
.initFunc = minmaxFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
|
|
|
@ -160,9 +160,9 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SNod
|
||||||
return SCAN_TYPE_STREAM;
|
return SCAN_TYPE_STREAM;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSelect->hasLastRowFunc) {
|
// if (pSelect->hasLastRowFunc) {
|
||||||
return SCAN_TYPE_LAST_ROW;
|
// return SCAN_TYPE_LAST_ROW;
|
||||||
}
|
// }
|
||||||
|
|
||||||
if (NULL == pScanCols) {
|
if (NULL == pScanCols) {
|
||||||
// select count(*) from t
|
// select count(*) from t
|
||||||
|
@ -474,6 +474,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pAgg->hasLastRow = pSelect->hasLastRowFunc;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// set grouyp keys, agg funcs and having conditions
|
// set grouyp keys, agg funcs and having conditions
|
||||||
|
|
|
@ -1616,6 +1616,46 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
|
||||||
return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef);
|
return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || !(((SAggLogicNode*)pNode)->hasLastRow) ||
|
||||||
|
NULL != ((SAggLogicNode*)pNode)->pGroupKeys || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||||
|
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0)) ||
|
||||||
|
NULL != ((SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0))->node.pConditions) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pFunc = NULL;
|
||||||
|
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
|
||||||
|
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
|
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized);
|
||||||
|
|
||||||
|
if (NULL == pAgg) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pNode = NULL;
|
||||||
|
FOREACH(pNode, pAgg->pAggFuncs) {
|
||||||
|
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||||
|
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row");
|
||||||
|
pFunc->functionName[len] = '\0';
|
||||||
|
fmGetFuncInfo(pFunc, NULL, 0);
|
||||||
|
}
|
||||||
|
pAgg->hasLastRow = false;
|
||||||
|
|
||||||
|
((SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))->scanType = SCAN_TYPE_LAST_ROW;
|
||||||
|
|
||||||
|
pCxt->optimized = true;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// merge projects
|
// merge projects
|
||||||
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
|
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
|
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
|
||||||
|
@ -1704,7 +1744,8 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
|
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
|
||||||
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
||||||
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
|
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
|
||||||
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize}
|
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
|
||||||
|
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize}
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -197,6 +197,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||||
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
|
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||||
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
|
return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
|
||||||
|
// case QUERY_NODE_LOGIC_PLAN_PARTITION:
|
||||||
|
// return stbSplHasMultiTbScan(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||||
|
|
|
@ -99,6 +99,8 @@ TEST_F(PlanBasicTest, lastRowFunc) {
|
||||||
run("SELECT LAST_ROW(c1, c2) FROM t1");
|
run("SELECT LAST_ROW(c1, c2) FROM t1");
|
||||||
|
|
||||||
run("SELECT LAST_ROW(c1) FROM st1");
|
run("SELECT LAST_ROW(c1) FROM st1");
|
||||||
|
|
||||||
|
run("SELECT LAST_ROW(c1), SUM(c3) FROM t1");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(PlanBasicTest, sampleFunc) {
|
TEST_F(PlanBasicTest, sampleFunc) {
|
||||||
|
|
Loading…
Reference in New Issue