fix: csum/diff/mavg support subquery

This commit is contained in:
Xiaoyu Wang 2022-07-01 19:59:28 +08:00
parent ec7bfd9eba
commit d1eef6880b
4 changed files with 48 additions and 30 deletions

View File

@ -2180,7 +2180,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "tail", .name = "tail",
.type = FUNCTION_TYPE_TAIL, .type = FUNCTION_TYPE_TAIL,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateTail, .translateFunc = translateTail,
.getEnvFunc = getTailFuncEnv, .getEnvFunc = getTailFuncEnv,

View File

@ -1477,7 +1477,7 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName); strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode); pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
if (TSDB_CODE_SUCCESS == pCxt->errCode) { if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode == getFuncInfo(pCxt, pFunc); pCxt->errCode = getFuncInfo(pCxt, pFunc);
} }
if (TSDB_CODE_SUCCESS == pCxt->errCode) { if (TSDB_CODE_SUCCESS == pCxt->errCode) {
*pNode = (SNode*)pFunc; *pNode = (SNode*)pFunc;
@ -3221,7 +3221,7 @@ static int32_t checkTableTagsSchema(STranslateContext* pCxt, SHashObj* pHash, SN
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
if ((TSDB_DATA_TYPE_VARCHAR == pTag->dataType.type && pTag->dataType.bytes > TSDB_MAX_BINARY_LEN) || if ((TSDB_DATA_TYPE_VARCHAR == pTag->dataType.type && pTag->dataType.bytes > TSDB_MAX_BINARY_LEN) ||
(TSDB_DATA_TYPE_NCHAR == pTag->dataType.type && pTag->dataType.bytes > TSDB_MAX_NCHAR_LEN)) { (TSDB_DATA_TYPE_NCHAR == pTag->dataType.type && pTag->dataType.bytes > TSDB_MAX_NCHAR_LEN)) {
code = code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN);
} }
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -588,7 +588,7 @@ static int32_t pushDownCondOptDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* p
typedef struct SPartAggCondContext { typedef struct SPartAggCondContext {
SAggLogicNode* pAgg; SAggLogicNode* pAgg;
bool hasAggFunc; bool hasAggFunc;
} SPartAggCondContext; } SPartAggCondContext;
static EDealRes partAggCondHasAggFuncImpl(SNode* pNode, void* pContext) { static EDealRes partAggCondHasAggFuncImpl(SNode* pNode, void* pContext) {
@ -613,11 +613,11 @@ static int32_t partitionAggCondHasAggFunc(SAggLogicNode* pAgg, SNode* pCond) {
static int32_t partitionAggCondConj(SAggLogicNode* pAgg, SNode** ppAggFuncCond, SNode** ppGroupKeyCond) { static int32_t partitionAggCondConj(SAggLogicNode* pAgg, SNode** ppAggFuncCond, SNode** ppGroupKeyCond) {
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pAgg->node.pConditions; SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pAgg->node.pConditions;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SNodeList* pAggFuncConds = NULL; SNodeList* pAggFuncConds = NULL;
SNodeList* pGroupKeyConds = NULL; SNodeList* pGroupKeyConds = NULL;
SNode* pCond = NULL; SNode* pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) { FOREACH(pCond, pLogicCond->pParameterList) {
if (partitionAggCondHasAggFunc(pAgg, pCond)) { if (partitionAggCondHasAggFunc(pAgg, pCond)) {
code = nodesListMakeAppend(&pAggFuncConds, nodesCloneNode(pCond)); code = nodesListMakeAppend(&pAggFuncConds, nodesCloneNode(pCond));
@ -671,14 +671,14 @@ static int32_t pushCondToAggCond(SOptimizeContext* pCxt, SAggLogicNode* pAgg, SN
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef struct SRewriteAggGroupKeyCondContext{ typedef struct SRewriteAggGroupKeyCondContext {
SAggLogicNode *pAgg; SAggLogicNode* pAgg;
int32_t errCode; int32_t errCode;
} SRewriteAggGroupKeyCondContext; } SRewriteAggGroupKeyCondContext;
static EDealRes rewriteAggGroupKeyCondForPushDownImpl(SNode** pNode, void* pContext) { static EDealRes rewriteAggGroupKeyCondForPushDownImpl(SNode** pNode, void* pContext) {
SRewriteAggGroupKeyCondContext* pCxt = pContext; SRewriteAggGroupKeyCondContext* pCxt = pContext;
SAggLogicNode* pAgg = pCxt->pAgg; SAggLogicNode* pAgg = pCxt->pAgg;
if (QUERY_NODE_COLUMN == nodeType(*pNode)) { if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
SNode* pGroupKey = NULL; SNode* pGroupKey = NULL;
FOREACH(pGroupKey, pAgg->pGroupKeys) { FOREACH(pGroupKey, pAgg->pGroupKeys) {
@ -711,14 +711,14 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
OPTIMIZE_FLAG_TEST_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { OPTIMIZE_FLAG_TEST_MASK(pAgg->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//TODO: remove it after full implementation of pushing down to child // TODO: remove it after full implementation of pushing down to child
if (1 != LIST_LENGTH(pAgg->node.pChildren) || if (1 != LIST_LENGTH(pAgg->node.pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) { QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SNode* pAggFuncCond = NULL; SNode* pAggFuncCond = NULL;
SNode* pGroupKeyCond = NULL; SNode* pGroupKeyCond = NULL;
int32_t code = partitionAggCond(pAgg, &pAggFuncCond, &pGroupKeyCond); int32_t code = partitionAggCond(pAgg, &pAggFuncCond, &pGroupKeyCond);
if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncCond) { if (TSDB_CODE_SUCCESS == code && NULL != pAggFuncCond) {
code = pushCondToAggCond(pCxt, pAgg, &pAggFuncCond); code = pushCondToAggCond(pCxt, pAgg, &pAggFuncCond);
@ -1455,9 +1455,17 @@ static int32_t rewriteTailOptCreateSort(SIndefRowsFuncLogicNode* pIndef, SLogicN
TSWAP(pSort->node.pChildren, pIndef->node.pChildren); TSWAP(pSort->node.pChildren, pIndef->node.pChildren);
pSort->node.precision = pIndef->node.precision; pSort->node.precision = pIndef->node.precision;
SFunctionNode* pTail = NULL;
SNode* pFunc = NULL;
FOREACH(pFunc, pIndef->pFuncs) {
if (FUNCTION_TYPE_TAIL == ((SFunctionNode*)pFunc)->funcType) {
pTail = (SFunctionNode*)pFunc;
break;
}
}
// tail(expr, [limit, offset,] _rowts) // tail(expr, [limit, offset,] _rowts)
SFunctionNode* pTail = (SFunctionNode*)nodesListGetNode(pIndef->pFuncs, 0); int32_t rowtsIndex = LIST_LENGTH(pTail->pParameterList) - 1;
int32_t rowtsIndex = LIST_LENGTH(pTail->pParameterList) - 1;
int32_t code = nodesListMakeStrictAppend( int32_t code = nodesListMakeStrictAppend(
&pSort->pSortKeys, rewriteTailOptCreateOrderByExpr(nodesListGetNode(pTail->pParameterList, rowtsIndex))); &pSort->pSortKeys, rewriteTailOptCreateOrderByExpr(nodesListGetNode(pTail->pParameterList, rowtsIndex)));
@ -1477,12 +1485,12 @@ static int32_t rewriteTailOptCreateSort(SIndefRowsFuncLogicNode* pIndef, SLogicN
return code; return code;
} }
static SNode* rewriteTailOptCreateProjectExpr(SFunctionNode* pTail) { static SNode* rewriteTailOptCreateProjectExpr(SFunctionNode* pFunc) {
SNode* pExpr = nodesCloneNode(nodesListGetNode(pTail->pParameterList, 0)); SNode* pExpr = nodesCloneNode(nodesListGetNode(pFunc->pParameterList, 0));
if (NULL == pExpr) { if (NULL == pExpr) {
return NULL; return NULL;
} }
strcpy(((SExprNode*)pExpr)->aliasName, pTail->node.aliasName); strcpy(((SExprNode*)pExpr)->aliasName, pFunc->node.aliasName);
return pExpr; return pExpr;
} }
@ -1495,12 +1503,22 @@ static int32_t rewriteTailOptCreateProject(SIndefRowsFuncLogicNode* pIndef, SLog
TSWAP(pProject->node.pTargets, pIndef->node.pTargets); TSWAP(pProject->node.pTargets, pIndef->node.pTargets);
pProject->node.precision = pIndef->node.precision; pProject->node.precision = pIndef->node.precision;
// tail(expr, [limit, offset,] _rowts) int32_t code = TSDB_CODE_SUCCESS;
SFunctionNode* pTail = (SFunctionNode*)nodesListGetNode(pIndef->pFuncs, 0); SFunctionNode* pTail = NULL;
int32_t limitIndex = LIST_LENGTH(pTail->pParameterList) > 2 ? 1 : -1; SNode* pFunc = NULL;
int32_t offsetIndex = LIST_LENGTH(pTail->pParameterList) > 3 ? 2 : -1; FOREACH(pFunc, pIndef->pFuncs) {
code = nodesListMakeStrictAppend(&pProject->pProjections, rewriteTailOptCreateProjectExpr((SFunctionNode*)pFunc));
if (TSDB_CODE_SUCCESS != code) {
break;
}
if (FUNCTION_TYPE_TAIL == ((SFunctionNode*)pFunc)->funcType) {
pTail = (SFunctionNode*)pFunc;
}
}
int32_t code = nodesListMakeStrictAppend(&pProject->pProjections, rewriteTailOptCreateProjectExpr(pTail)); // tail(expr, [limit, offset,] _rowts)
int32_t limitIndex = LIST_LENGTH(pTail->pParameterList) > 2 ? 1 : -1;
int32_t offsetIndex = LIST_LENGTH(pTail->pParameterList) > 3 ? 2 : -1;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteTailOptCreateLimit(limitIndex < 0 ? NULL : nodesListGetNode(pTail->pParameterList, limitIndex), code = rewriteTailOptCreateLimit(limitIndex < 0 ? NULL : nodesListGetNode(pTail->pParameterList, limitIndex),
offsetIndex < 0 ? NULL : nodesListGetNode(pTail->pParameterList, offsetIndex), offsetIndex < 0 ? NULL : nodesListGetNode(pTail->pParameterList, offsetIndex),
@ -1855,7 +1873,7 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
} }
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) { static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0); SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS}; SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt); nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt);

View File

@ -58,8 +58,8 @@ class TDTestCase:
for coltype in coltypes: for coltype in coltypes:
colname = coltype[0] colname = coltype[0]
if coltype[1] in support_types and coltype[-1] != "TAG" : if coltype[1] in support_types and coltype[-1] != "TAG" :
irate_sql = "select irate({}) from (select * from {} order by tbname ) ".format(colname, tbname) irate_sql = "select irate({}) from {}".format(colname, tbname)
origin_sql = "select ts , {} , cast(ts as bigint) from (select ts , {} from {} order by ts desc limit 2 offset 0 ) order by ts".format(colname,colname, tbname) origin_sql = "select tail({}, 2), cast(ts as bigint) from {} order by ts".format(colname, tbname)
tdSql.query(irate_sql) tdSql.query(irate_sql)
irate_result = tdSql.queryResult irate_result = tdSql.queryResult
@ -68,10 +68,10 @@ class TDTestCase:
irate_value = irate_result[0][0] irate_value = irate_result[0][0]
if origin_result[1][-1] - origin_result[0][-1] == 0: if origin_result[1][-1] - origin_result[0][-1] == 0:
comput_irate_value = 0 comput_irate_value = 0
elif (origin_result[1][1] - origin_result[0][1])<0: elif (origin_result[1][0] - origin_result[0][0])<0:
comput_irate_value = origin_result[1][1]*1000/( origin_result[1][-1] - origin_result[0][-1]) comput_irate_value = origin_result[1][0]*1000/( origin_result[1][-1] - origin_result[0][-1])
else: else:
comput_irate_value = (origin_result[1][1] - origin_result[0][1])*1000/( origin_result[1][-1] - origin_result[0][-1]) comput_irate_value = (origin_result[1][0] - origin_result[0][0])*1000/( origin_result[1][-1] - origin_result[0][-1])
if comput_irate_value ==irate_value: if comput_irate_value ==irate_value:
tdLog.info(" irate work as expected , sql is %s "% irate_sql) tdLog.info(" irate work as expected , sql is %s "% irate_sql)
else: else: