diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index bd1964187d..ccc8d3023e 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -495,6 +495,10 @@ typedef struct SSessionWindow { int32_t primaryColId; // primary timestamp column } SSessionWindow; +typedef struct SStateWindow { + int32_t columnId; +} SStateWindow; + typedef struct { SMsgHead head; char version[TSDB_VERSION_LEN]; diff --git a/include/libs/function/function.h b/include/libs/function/function.h index f4c1fbbbc0..f7d1fb9e45 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -259,7 +259,7 @@ int32_t qIsBuiltinFunction(const char* name, int32_t len, bool* scalarFunction); bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId); -const char* qGetFunctionName(int32_t functionId); +bool qIsAggregateFunction(const char* functionName); tExprNode* exprTreeFromBinary(const void* data, size_t size); diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index c57d28ebf4..64706a922b 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -77,6 +77,7 @@ typedef struct SQueryStmtInfo { STimeWindow window; // the whole query time window SInterval interval; // tumble time window SSessionWindow sessionWindow; // session time window + SStateWindow stateWindow; // state window query SGroupbyExpr groupbyExpr; // groupby tags info SArray * colList; // SArray SFieldInfo fieldsInfo; @@ -180,6 +181,7 @@ STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex); SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex); int32_t getNewResColId(); +void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn); #ifdef __cplusplus } diff --git a/source/libs/function/src/tfunction.c b/source/libs/function/src/tfunction.c index c47f3a249a..2e4a4a058e 100644 --- a/source/libs/function/src/tfunction.c +++ b/source/libs/function/src/tfunction.c @@ -46,10 +46,15 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct return true; } -const char* qGetFunctionName(int32_t functionId) { +bool qIsAggregateFunction(const char* functionName) { + assert(functionName != NULL); + bool scalefunc = false; + qIsBuiltinFunction(functionName, strlen(functionName), &scalefunc); + return !scalefunc; } + SAggFunctionInfo* qGetFunctionInfo(const char* name, int32_t len) { pthread_once(&functionHashTableInit, doInitFunctionHashTable); diff --git a/source/libs/parser/inc/parserUtil.h b/source/libs/parser/inc/parserUtil.h index f567f6553b..c57ea905e9 100644 --- a/source/libs/parser/inc/parserUtil.h +++ b/source/libs/parser/inc/parserUtil.h @@ -41,8 +41,6 @@ SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* nam void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema); SColumn createColumn(uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema); -SSourceParam addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn); - SInternalField* insertFieldInfo(SFieldInfo* pFieldInfo, int32_t index, SSchema* field); int32_t getNumOfFields(SFieldInfo* pFieldInfo); SInternalField* getInternalField(SFieldInfo* pFieldInfo, int32_t index); diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index a3a6e58785..ecb707e582 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -544,7 +544,7 @@ SColumn createColumn(uint64_t uid, const char* tableName, int8_t flag, const SSc return c; } -SSourceParam addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn) { +void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn) { assert(pSourceParam != NULL); pSourceParam->num += 1; diff --git a/source/libs/parser/test/plannerTest.cpp b/source/libs/parser/test/plannerTest.cpp index 6e091bf8f4..7eb45e92d2 100644 --- a/source/libs/parser/test/plannerTest.cpp +++ b/source/libs/parser/test/plannerTest.cpp @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include #include #pragma GCC diagnostic ignored "-Wwrite-strings" @@ -66,65 +67,60 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) { } } -//TEST(testCase, planner_test) { -// SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)"); -// ASSERT_EQ(info1.valid, true); -// -// char msg[128] = {0}; -// SMsgBuf buf; -// buf.len = 128; -// buf.buf = msg; -// -// SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0); -// int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); -// ASSERT_EQ(code, 0); -// -// SMetaReq req = {0}; -// int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); -// ASSERT_EQ(ret, 0); -// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); -// -// SQueryStmtInfo* pQueryInfo = createQueryInfo(); -// setTableMetaInfo(pQueryInfo, &req); -// -// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0); -// ret = validateSqlNode(pSqlNode, pQueryInfo, &buf); -// ASSERT_EQ(ret, 0); -// -// SArray* pExprList = pQueryInfo->exprList[0]; -// ASSERT_EQ(taosArrayGetSize(pExprList), 2); -// -// SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1); -// ASSERT_EQ(p1->base.pColumns->uid, 110); -// ASSERT_EQ(p1->base.numOfParams, 1); -// ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE); -// ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)"); -// ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_NORMAL); -// ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)"); -// ASSERT_EQ(p1->base.interBytes, 16); -// -// ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE); -// ASSERT_EQ(p1->pExpr->_function.functionId, FUNCTION_TOP); -// ASSERT_TRUE(p1->pExpr->_node.pRight == NULL); -// -// tExprNode* pParam = p1->pExpr->_node.pLeft; -// -// ASSERT_EQ(pParam->nodeType, TEXPR_BINARYEXPR_NODE); -// ASSERT_EQ(pParam->_node.optr, TSDB_BINARY_OP_DIVIDE); -// ASSERT_EQ(pParam->_node.pLeft->nodeType, TEXPR_BINARYEXPR_NODE); -// ASSERT_EQ(pParam->_node.pRight->nodeType, TEXPR_VALUE_NODE); -// -// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3); -// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); -// -// struct SQueryPlanNode* n = nullptr; -// code = qCreateQueryPlan(pQueryInfo, &n); -// -// char* str = NULL; -// qQueryPlanToString(n, &str); -// printf("%s\n", str); -// -// destroyQueryInfo(pQueryInfo); -// qParserClearupMetaRequestInfo(&req); -// destroySqlInfo(&info1); -//} \ No newline at end of file +TEST(testCase, planner_test) { + SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)"); + ASSERT_EQ(info1.valid, true); + + char msg[128] = {0}; + SMsgBuf buf; + buf.len = 128; + buf.buf = msg; + + SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0); + int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf); + ASSERT_EQ(code, 0); + + SMetaReq req = {0}; + int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128); + ASSERT_EQ(ret, 0); + ASSERT_EQ(taosArrayGetSize(req.pTableName), 1); + + SQueryStmtInfo* pQueryInfo = createQueryInfo(); + setTableMetaInfo(pQueryInfo, &req); + + SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0); + ret = validateSqlNode(pSqlNode, pQueryInfo, &buf); + ASSERT_EQ(ret, 0); + + SArray* pExprList = pQueryInfo->exprList[0]; + ASSERT_EQ(taosArrayGetSize(pExprList), 2); + + SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1); + ASSERT_EQ(p1->base.pColumns->uid, 110); + ASSERT_EQ(p1->base.numOfParams, 1); + ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE); + ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)"); + ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP); + ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)"); + ASSERT_EQ(p1->base.interBytes, 16); + + ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE); + ASSERT_STREQ(p1->pExpr->_function.functionName, "top"); + + tExprNode* pParam = p1->pExpr->_function.pChild[0]; + + ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE); + ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3); + ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2); + + struct SQueryPlanNode* n = nullptr; + code = qCreateQueryPlan(pQueryInfo, &n); + + char* str = NULL; + qQueryPlanToString(n, &str); + printf("%s\n", str); + + destroyQueryInfo(pQueryInfo); + qParserClearupMetaRequestInfo(&req); + destroySqlInfo(&info1); +} \ No newline at end of file diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 81404d74ca..0621a90798 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ -#include "os.h" -#include "plannerInt.h" -#include "parser.h" #include "function.h" +#include "os.h" +#include "parser.h" +#include "plannerInt.h" #define QNODE_TAGSCAN 1 #define QNODE_TABLESCAN 2 @@ -30,7 +30,8 @@ #define QNODE_UNIONALL 10 #define QNODE_TIMEWINDOW 11 #define QNODE_SESSIONWINDOW 12 -#define QNODE_FILL 13 +#define QNODE_STATEWINDOW 13 +#define QNODE_FILL 14 typedef struct SFillEssInfo { int32_t fillType; // fill type @@ -161,7 +162,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SQueryTableInfo* info, SArray* pExprs, SArray* tableCols) { if (pQueryInfo->info.onlyTagQuery) { - int32_t num = (int32_t) taosArrayGetSize(pExprs); + int32_t num = (int32_t) taosArrayGetSize(pExprs); SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info, NULL); if (pQueryInfo->info.distinct) { @@ -178,20 +179,19 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs); pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, info, NULL); } else { + STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0); + // table source column projection, generate the projection expr int32_t numOfCols = (int32_t) taosArrayGetSize(tableCols); SExprInfo** pExpr = calloc(numOfCols, POINTER_BYTES); - - STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0); - for (int32_t i = 0; i < numOfCols; ++i) { SColumn* pCol = taosArrayGetP(tableCols, i); - SColumnIndex index = {.tableIndex = 0, /*.columnIndex = pCol->columnIndex*/}; + SSchema* pSchema = getOneColumnSchema(pTableMetaInfo1->pTableMeta, i); - SSchema* pSchema = getOneColumnSchema(pTableMetaInfo->pTableMeta, i); - SSchema resultSchema = *pSchema; + SSourceParam param = {0}; + addIntoSourceParam(¶m, NULL, pCol); - SExprInfo* p = NULL;//createExprInfo(pTableMetaInfo1, FUNCTION_PRJ, &index, NULL, &resultSchema, 0); + SExprInfo* p = createExprInfo(pTableMetaInfo1, "project", ¶m, pSchema, 0); pExpr[i] = p; } @@ -202,33 +202,69 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe return pNode; } -static SQueryPlanNode* doCreateQueryPlanForOneTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info, - SArray* pExprs) { +static int32_t getFunctionLevel(SQueryStmtInfo* pQueryInfo) { + int32_t n = 10; + + int32_t level = 0; + for(int32_t i = 0; i < n; ++i) { + SArray* pList = pQueryInfo->exprList[i]; + if (taosArrayGetSize(pList) > 0) { + level += 1; + } + } + + return level; +} + +static SQueryPlanNode* createOneQueryPlanNode(SArray* p, SQueryPlanNode* pNode, SExprInfo* pExpr, SQueryTableInfo* info) { + if (pExpr->pExpr->nodeType == TEXPR_FUNCTION_NODE) { + bool aggregateFunc = qIsAggregateFunction(pExpr->pExpr->_function.functionName); + if (aggregateFunc) { + int32_t numOfOutput = (int32_t)taosArrayGetSize(p); + return createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, numOfOutput, info, NULL); + } else { + int32_t numOfOutput = (int32_t)taosArrayGetSize(p); + return createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, numOfOutput, info, NULL); + } + } else { + int32_t numOfOutput = (int32_t)taosArrayGetSize(p); + return createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, numOfOutput, info, NULL); + } +} + +static SQueryPlanNode* doCreateQueryPlanForOneTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info, SArray** pExprs) { // check for aggregation size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo); - if (pQueryInfo->interval.interval > 0) { - int32_t numOfOutput = (int32_t)taosArrayGetSize(pExprs); + int32_t level = getFunctionLevel(pQueryInfo); + for(int32_t i = level - 1; i >= 0; --i) { + SArray* p = pQueryInfo->exprList[i]; + SExprInfo* pExpr = (SExprInfo*)taosArrayGetP(p, 0); - pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, pExprs->pData, numOfOutput, info, &pQueryInfo->interval); - if (numOfGroupCols != 0) { - pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, pExprs->pData, numOfOutput, info, &pQueryInfo->groupbyExpr); + if (i == 0) { + if (pQueryInfo->interval.interval > 0) { + int32_t numOfOutput = (int32_t)taosArrayGetSize(p); + pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, numOfOutput, info, &pQueryInfo->interval); + } else if (pQueryInfo->sessionWindow.gap > 0) { + pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, NULL, 0, info, NULL); + } else if (pQueryInfo->stateWindow.columnId > 0) { + pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, NULL, 0, info, NULL); + } else { + pNode = createOneQueryPlanNode(p, pNode, pExpr, info); + } + } else { + pNode = createOneQueryPlanNode(p, pNode, pExpr, info); } - } else if (numOfGroupCols > 0) { - int32_t numOfOutput = (int32_t)taosArrayGetSize(pExprs); - pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, pExprs->pData, numOfOutput, info, - &pQueryInfo->groupbyExpr); - } else if (pQueryInfo->sessionWindow.gap > 0) { - pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, NULL, 0, info, NULL); - } else if (pQueryInfo->info.simpleAgg) { - int32_t numOfOutput = (int32_t)taosArrayGetSize(pExprs); - pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, pExprs->pData, numOfOutput, info, NULL); } - if (pQueryInfo->havingFieldNum > 0 || pQueryInfo->info.arithmeticOnAgg) { + if (numOfGroupCols != 0) { + pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, NULL, 0, info, &pQueryInfo->groupbyExpr); + } + + if (pQueryInfo->havingFieldNum > 0) { // int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1); -// pNode = -// createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, NULL); +// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, +// NULL); } if (pQueryInfo->fillType != TSDB_FILL_NONE) { @@ -314,7 +350,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) { // 3. add the join node here SQueryTableInfo info = {0}; - int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList); + int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]); SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables, pQueryInfo->exprList[0]->pData, num, &info, NULL); @@ -324,7 +360,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) { taosArrayPush(upstream, &pNode); } else { // only one table, normal query process STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0]; - SQueryPlanNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList, pQueryInfo->colList); + SQueryPlanNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList); upstream = taosArrayInit(5, POINTER_BYTES); taosArrayPush(upstream, &pNode); }