fix: plan problem when functions that requires a timeline is used directly in a super table
This commit is contained in:
parent
3b09027176
commit
ba0518e2b9
|
@ -111,6 +111,7 @@ typedef struct SAggLogicNode {
|
|||
SNodeList* pGroupKeys;
|
||||
SNodeList* pAggFuncs;
|
||||
bool hasLastRow;
|
||||
bool hasTimeLineFunc;
|
||||
} SAggLogicNode;
|
||||
|
||||
typedef struct SProjectLogicNode {
|
||||
|
|
|
@ -521,7 +521,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
|||
|
||||
// NOTE: the last parameter is the primary timestamp column
|
||||
// todo: refactor this
|
||||
if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
||||
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
||||
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
|
||||
// ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
}
|
||||
|
@ -4440,10 +4440,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
STimeWindowAggSupp aggSup = (STimeWindowAggSupp){
|
||||
.waterMark = pTableScanNode->watermark,
|
||||
.calTrigger = pTableScanNode->triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
STimeWindowAggSupp aggSup = (STimeWindowAggSupp){
|
||||
.waterMark = pTableScanNode->watermark,
|
||||
.calTrigger = pTableScanNode->triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
|
||||
if (pHandle->vnode) {
|
||||
|
|
|
@ -2242,7 +2242,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "last_row",
|
||||
.type = FUNCTION_TYPE_LAST_ROW,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2253,7 +2253,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_cache_last_row",
|
||||
.type = FUNCTION_TYPE_CACHE_LAST_ROW,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2263,7 +2263,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "first",
|
||||
.type = FUNCTION_TYPE_FIRST,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2277,7 +2277,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_first_partial",
|
||||
.type = FUNCTION_TYPE_FIRST_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateFirstLastPartial,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2288,7 +2288,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_first_merge",
|
||||
.type = FUNCTION_TYPE_FIRST_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateFirstLastMerge,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2299,7 +2299,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "last",
|
||||
.type = FUNCTION_TYPE_LAST,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2313,7 +2313,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_last_partial",
|
||||
.type = FUNCTION_TYPE_LAST_PARTIAL,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateFirstLastPartial,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
@ -2324,7 +2324,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "_last_merge",
|
||||
.type = FUNCTION_TYPE_LAST_MERGE,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateFirstLastMerge,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
|
|
@ -1237,7 +1237,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
|
|||
pSelect->hasTailFunc = pSelect->hasTailFunc ? true : (FUNCTION_TYPE_TAIL == pFunc->funcType);
|
||||
pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType);
|
||||
pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType);
|
||||
pSelect->hasTimeLineFunc = pSelect->hasLastRowFunc ? true : fmIsTimelineFunc(pFunc->funcId);
|
||||
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -478,8 +478,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
}
|
||||
|
||||
pAgg->hasLastRow = pSelect->hasLastRowFunc;
|
||||
pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc;
|
||||
pAgg->node.groupAction = GROUP_ACTION_SET;
|
||||
pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
|
||||
pAgg->node.requireDataOrder = pAgg->hasTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE;
|
||||
pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -928,7 +929,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
// set grouyp keys, agg funcs and having conditions
|
||||
SNodeList* pGroupKeys = NULL;
|
||||
SNode* pProjection = NULL;
|
||||
SNode* pProjection = NULL;
|
||||
FOREACH(pProjection, pSelect->pProjectionList) {
|
||||
code = nodesListMakeStrictAppend(&pGroupKeys, createGroupingSetNode(pProjection));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
|
|
@ -904,14 +904,6 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
|
||||
if (NULL != pScan->pGroupTags) {
|
||||
return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
|
||||
}
|
||||
return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
|
||||
}
|
||||
|
||||
static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pScan->pScanCols) {
|
||||
|
@ -922,7 +914,7 @@ static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) {
|
||||
static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) {
|
||||
SNodeList* pMergeKeys = NULL;
|
||||
int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -937,12 +929,24 @@ static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pS
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)pInfo->pSplitNode;
|
||||
if (SCAN_TYPE_TABLE_MERGE == pScan->scanType) {
|
||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
return stbSplSplitMergeScanNode(pCxt, pInfo->pSubplan, pScan);
|
||||
}
|
||||
if (NULL != pScan->pGroupTags) {
|
||||
return stbSplSplitScanNodeWithPartTags(pCxt, pInfo);
|
||||
}
|
||||
return stbSplSplitScanNodeWithoutPartTags(pCxt, pInfo);
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, pJoin->node.pChildren) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
|
||||
code = stbSplSplitScanNodeForJoin(pCxt, pSubplan, (SScanLogicNode*)pChild);
|
||||
code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild);
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
|
||||
code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);
|
||||
} else {
|
||||
|
|
|
@ -124,7 +124,7 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode*
|
|||
}
|
||||
|
||||
static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) {
|
||||
if (SCAN_TYPE_TABLE != pScan->scanType || SCAN_TYPE_TABLE_MERGE != pScan->scanType) {
|
||||
if (SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
// The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK
|
||||
|
@ -161,7 +161,9 @@ static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel req
|
|||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
pAgg->node.resultDataOrder = requirement;
|
||||
pAgg->node.requireDataOrder = requirement;
|
||||
if (pAgg->hasTimeLineFunc) {
|
||||
pAgg->node.requireDataOrder = requirement < DATA_ORDER_LEVEL_IN_GROUP ? DATA_ORDER_LEVEL_IN_GROUP : requirement;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -139,6 +139,10 @@ TEST_F(PlanBasicTest, timeLineFunc) {
|
|||
run("SELECT CSUM(c1) FROM t1");
|
||||
|
||||
run("SELECT CSUM(c1) FROM st1");
|
||||
|
||||
run("SELECT TWA(c1) FROM t1");
|
||||
|
||||
run("SELECT TWA(c1) FROM st1");
|
||||
}
|
||||
|
||||
TEST_F(PlanBasicTest, multiResFunc) {
|
||||
|
|
Loading…
Reference in New Issue