diff --git a/include/common/common.h b/include/common/common.h index 0913c12597..d99e4a78b7 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -58,6 +58,7 @@ typedef struct SDataBlockInfo { typedef struct SSDataBlock { SColumnDataAgg *pBlockAgg; SArray *pDataBlock; // SArray + SArray *pTagsList; // SArray for tag value SDataBlockInfo info; } SSDataBlock; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 3ccc4bf4cd..1ff3f02da5 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -23,6 +23,35 @@ extern "C" { #define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_PARTIAL 2 +enum OPERATOR_TYPE_E { + OP_TableScan = 1, + OP_DataBlocksOptScan = 2, + OP_TableSeqScan = 3, + OP_TagScan = 4, + OP_TableBlockInfoScan= 5, + OP_Aggregate = 6, + OP_Project = 7, + OP_Groupby = 8, + OP_Limit = 9, + OP_SLimit = 10, + OP_TimeWindow = 11, + OP_SessionWindow = 12, + OP_StateWindow = 22, + OP_Fill = 13, + OP_MultiTableAggregate = 14, + OP_MultiTableTimeInterval = 15, +// OP_DummyInput = 16, //TODO remove it after fully refactor. +// OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream. +// OP_GlobalAggregate = 18, // global merge for the multi-way data sources. + OP_Filter = 19, + OP_Distinct = 20, + OP_Join = 21, + OP_AllTimeWindow = 23, + OP_AllMultiTableTimeInterval = 24, + OP_Order = 25, + OP_Exchange = 26, +}; + struct SEpSet; struct SQueryPlanNode; struct SQueryDistPlanNode; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a99717a123..907fb4d2bf 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -279,34 +279,6 @@ enum { OP_EXEC_DONE = 3, }; -enum OPERATOR_TYPE_E { - OP_TableScan = 1, - OP_DataBlocksOptScan = 2, - OP_TableSeqScan = 3, - OP_TagScan = 4, - OP_TableBlockInfoScan= 5, - OP_Aggregate = 6, - OP_Project = 7, - OP_Groupby = 8, - OP_Limit = 9, - OP_SLimit = 10, - OP_TimeWindow = 11, - OP_SessionWindow = 12, - OP_Fill = 13, - OP_MultiTableAggregate = 14, - OP_MultiTableTimeInterval = 15, - OP_DummyInput = 16, //TODO remove it after fully refactor. - OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream. - OP_GlobalAggregate = 18, // global merge for the multi-way data sources. - OP_Filter = 19, - OP_Distinct = 20, - OP_Join = 21, - OP_StateWindow = 22, - OP_AllTimeWindow = 23, - OP_AllMultiTableTimeInterval = 24, - OP_Order = 25, -}; - typedef struct SOperatorInfo { uint8_t operatorType; bool blockingOptr; // block operator or not diff --git a/source/libs/function/src/tfunction.c b/source/libs/function/src/tfunction.c index d3fc19a47f..7e3b4bcd5d 100644 --- a/source/libs/function/src/tfunction.c +++ b/source/libs/function/src/tfunction.c @@ -182,25 +182,14 @@ bool isArithmeticQueryOnAggResult(SArray* pFunctionIdList) { return false; } -bool isGroupbyColumn(SArray* pFunctionIdList) { -// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); -// int32_t numOfCols = getNumOfColumns(pTableMetaInfo->pTableMeta); -// -// SGroupbyExpr* pGroupbyExpr = &pQueryInfo->groupbyExpr; -// for (int32_t k = 0; k < pGroupbyExpr->numOfGroupCols; ++k) { -// SColIndex* pIndex = taosArrayGet(pGroupbyExpr->columnInfo, k); -// if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < numOfCols) { // group by normal columns -// return true; -// } -// } - - return false; +bool isGroupbyColumn(SGroupbyExpr* pGroupby) { + return !pGroupby->groupbyTag; } bool isTopBotQuery(SArray* pFunctionIdList) { int32_t num = (int32_t) taosArrayGetSize(pFunctionIdList); for (int32_t i = 0; i < num; ++i) { - int32_t f = *(int16_t*) taosArrayGet(pFunctionIdList, i); + char* f = *(char**) taosArrayGet(pFunctionIdList, i); if (f == FUNCTION_TS) { continue; } @@ -432,7 +421,6 @@ bool hasTagValOutput(SArray* pFunctionIdList) { void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc) { assert(pFunctionIdList != NULL); - pDesc->blockDistribution = isBlockDistQuery(pFunctionIdList); if (pDesc->blockDistribution) { return; @@ -441,4 +429,5 @@ void extractFunctionDesc(SArray* pFunctionIdList, SMultiFunctionsDesc* pDesc) { pDesc->projectionQuery = isProjectionQuery(pFunctionIdList); pDesc->onlyTagQuery = isTagsQuery(pFunctionIdList); pDesc->interpQuery = isInterpQuery(pFunctionIdList); + pDesc->topbotQuery = isTopBotQuery(pFunctionIdList); } diff --git a/source/libs/parser/inc/astGenerator.h b/source/libs/parser/inc/astGenerator.h index 863c307f34..cb3d459de6 100644 --- a/source/libs/parser/inc/astGenerator.h +++ b/source/libs/parser/inc/astGenerator.h @@ -294,7 +294,10 @@ SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SSqlNode *pSe SAlterTableInfo * tSetAlterTableInfo(SToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableType); SCreatedTableInfo createNewChildTableInfo(SToken *pTableName, SArray *pTagNames, SArray *pTagVals, SToken *pToken, SToken *igExists); - +/*! + * test + * @param pSqlNode + */ void destroyAllSqlNode(struct SSubclause *pSqlNode); void destroySqlNode(SSqlNode *pSql); void freeCreateTableInfo(void* p); diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index a8a1c191f7..2536f8f060 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -1607,6 +1607,30 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) { if (pQueryInfo->fillType != TSDB_FILL_NONE) { return buildInvalidOperationMsg(pMsgBuf, msg4); } + + // select top(col, k), count(*) from table_name + int32_t num = 0; + SExprInfo* pMain = NULL; + size_t size = getNumOfExprs(pQueryInfo); + for (int32_t i = 0; i < size; ++i) { + SExprInfo* pExpr = getExprInfo(pQueryInfo, i); + const char* functionName = pExpr->pExpr->_function.functionName; + + if (strcmp(functionName, "top") != 0 && strcmp(functionName, "bottom") != 0) { + if (qIsAggregateFunction(functionName)) { + return buildInvalidOperationMsg(pMsgBuf, "invalid expression in select clause"); + } + + // the primary key is valid + if (pExpr->pExpr->nodeType == TEXPR_COL_NODE) { + if (pExpr->pExpr->pSchema->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + continue; + } + } + + continue; + } + } } /* @@ -2628,35 +2652,51 @@ static int32_t validateScalarFunctionParamNum(tSqlExpr* pSqlExpr, int32_t functi return code; } -SExprInfo* doAddProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, SColumnIndex* pColIndex, const char* aliasName, int32_t colId) { +int32_t doAddProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, SColumnIndex* pColIndex, + const char* aliasName, int32_t colId, SMsgBuf* pMsgBuf) { STableMeta* pTableMeta = getMetaInfo(pQueryInfo, pColIndex->tableIndex)->pTableMeta; SSchema* pSchema = getOneColumnSchema(pTableMeta, pColIndex->columnIndex); - SColumnIndex index = *pColIndex; - - char* funcName = NULL; - if (TSDB_COL_IS_TAG(index.type)) { - int32_t numOfCols = getNumOfColumns(pTableMeta); - index.columnIndex = pColIndex->columnIndex - numOfCols; - funcName = "project_tag"; - } else { - index.columnIndex = pColIndex->columnIndex; - funcName = "project_col"; - } const char* name = (aliasName == NULL)? pSchema->name:aliasName; SSchema s = createSchema(pSchema->type, pSchema->bytes, colId, name); - STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, index.tableIndex); - SColumn c = createColumn(pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->aliasName, index.type, pSchema); + tExprNode *pNode = NULL; + bool keepTableCols = true; + SArray* pColumnList = taosArrayInit(4, sizeof(SColumn)); - SSourceParam param = {0}; - addIntoSourceParam(¶m, NULL, &c); + tSqlExpr sqlNode = {0}; + sqlNode.type = SQL_NODE_TABLE_COLUMN; - return doAddOneExprInfo(pQueryInfo, funcName, ¶m, outputColIndex, pTableMetaInfo, &s, 0, s.name, true); + SToken colNameToken = {.z = pSchema->name, .n = strlen(pSchema->name)}; + sqlNode.columnName = colNameToken; + + int32_t ret = sqlExprToExprNode(&pNode, &sqlNode, pQueryInfo, pColumnList, &keepTableCols, pMsgBuf); + if (ret != TSDB_CODE_SUCCESS) { + tExprTreeDestroy(pNode, NULL); + return buildInvalidOperationMsg(pMsgBuf, "invalid expression in select clause"); + } + + SExprInfo* pExpr = createBinaryExprInfo(pNode, &s); + tstrncpy(pExpr->base.resSchema.name, name, tListLen(pExpr->base.resSchema.name)); + tstrncpy(pExpr->base.token, name, tListLen(pExpr->base.token)); + + SArray* pExprList = getCurrentExprList(pQueryInfo); + addExprInfo(pExprList, outputColIndex, pExpr, pQueryInfo->exprListLevelIndex); + + // extract columns according to the tExprNode tree + size_t num = taosArrayGetSize(pColumnList); + pExpr->base.pColumns = calloc(num, sizeof(SColumn)); + for (int32_t i = 0; i < num; ++i) { + SColumn* pCol = taosArrayGet(pColumnList, i); + pExpr->base.pColumns[i] = *pCol; + } + + pExpr->base.numOfCols = num; + return TSDB_CODE_SUCCESS; } -static int32_t doAddProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, SColumnIndex* pIndex, int32_t startPos) { +static int32_t doAddProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, SColumnIndex* pIndex, int32_t startPos, SMsgBuf* pMsgBuf) { STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, pIndex->tableIndex); STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; @@ -2669,7 +2709,7 @@ static int32_t doAddProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, SColu for (int32_t j = 0; j < numOfTotalColumns; ++j) { pIndex->columnIndex = j; - doAddProjectCol(pQueryInfo, startPos + j, pIndex, NULL, getNewResColId()); + doAddProjectCol(pQueryInfo, startPos + j, pIndex, NULL, getNewResColId(), pMsgBuf); } return numOfTotalColumns; @@ -2778,11 +2818,11 @@ int32_t addProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, tSqlExprItem* if (index.tableIndex == COLUMN_INDEX_INITIAL_VAL) { // all table columns are required. for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { index.tableIndex = i; - int32_t inc = doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos); + int32_t inc = doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos, pMsgBuf); startPos += inc; } } else { - doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos); + doAddProjectionExprAndResColumn(pQueryInfo, &index, startPos, pMsgBuf); } // add the primary timestamp column even though it is not required by user @@ -2821,7 +2861,7 @@ int32_t addProjectionExprAndResColumn(SQueryStmtInfo* pQueryInfo, tSqlExprItem* return buildInvalidOperationMsg(pMsgBuf, msg1); } - doAddProjectCol(pQueryInfo, startPos, &index, pItem->aliasName, getNewResColId()); + doAddProjectCol(pQueryInfo, startPos, &index, pItem->aliasName, getNewResColId(), pMsgBuf); } // add the primary timestamp column even though it is not required by user @@ -2920,12 +2960,12 @@ static tExprNode* doCreateColumnNode(SQueryStmtInfo* pQueryInfo, SColumnIndex* p static int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SMsgBuf* pMsgBuf); -static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_t* num, tExprNode** p, SArray* pCols, +static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_t* num, tExprNode*** p, SArray* pCols, bool* keepTableCols, const tSqlExpr* pSqlExpr, SMsgBuf* pMsgBuf) { SArray* pParamList = pSqlExpr->Expr.paramList; if (pParamList != NULL) { *num = taosArrayGetSize(pParamList); - p = calloc((*num), POINTER_BYTES); + (*p) = calloc((*num), POINTER_BYTES); for (int32_t i = 0; i < (*num); ++i) { tSqlExprItem* pItem = taosArrayGet(pParamList, i); @@ -2935,7 +2975,7 @@ static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_ return ret; } - int32_t code = sqlExprToExprNode(&p[i], pItem->pNode, pQueryInfo, pCols, keepTableCols, pMsgBuf); + int32_t code = sqlExprToExprNode(&(*p)[i], pItem->pNode, pQueryInfo, pCols, keepTableCols, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2946,10 +2986,10 @@ static int32_t doProcessFunctionLeafNodeParam(SQueryStmtInfo* pQueryInfo, int32_ } *num = 1; - p = calloc(*num, POINTER_BYTES); + (*p) = calloc(*num, POINTER_BYTES); SColumnIndex index = {.type = TSDB_COL_NORMAL, .tableIndex = 0, .columnIndex = 0}; - p[0] = doCreateColumnNode(pQueryInfo, &index, *keepTableCols, pCols); + (*p)[0] = doCreateColumnNode(pQueryInfo, &index, *keepTableCols, pCols); } return TSDB_CODE_SUCCESS; @@ -3037,7 +3077,7 @@ int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SM // do check the parameter number for scalar function if (scalar) { - int32_t ret = validateScalarFunctionParamNum(pSqlExpr, functionId, pMsgBuf); + int32_t ret = validateScalarFunctionParamNum((tSqlExpr*) pSqlExpr, functionId, pMsgBuf); if (ret != TSDB_CODE_SUCCESS) { return buildInvalidOperationMsg(pMsgBuf, "invalid number of function parameters"); } @@ -3087,7 +3127,7 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStm int32_t num = 0; tExprNode** p = NULL; - int32_t code = doProcessFunctionLeafNodeParam(pQueryInfo, &num, p, pCols, keepTableCols, pSqlExpr, pMsgBuf); + int32_t code = doProcessFunctionLeafNodeParam(pQueryInfo, &num, &p, pCols, keepTableCols, pSqlExpr, pMsgBuf); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3147,6 +3187,9 @@ int32_t sqlExprToExprNode(tExprNode **pExpr, const tSqlExpr* pSqlExpr, SQueryStm (*pExpr)->pSchema = calloc(1, sizeof(SSchema)); strncpy((*pExpr)->pSchema->name, pSqlExpr->exprToken.z, pSqlExpr->exprToken.n); + // it must be the aggregate function + assert(qIsAggregateFunction((*pExpr)->pSchema->name)); + uint64_t uid = findTmpSourceColumnInNextLevel(pQueryInfo, *pExpr); if (!(*keepTableCols)) { SColumn c = createColumn(uid, NULL, TSDB_COL_TMP, (*pExpr)->pSchema); @@ -3345,7 +3388,8 @@ int32_t validateSelectNodeList(SQueryStmtInfo* pQueryInfo, SArray* pSelNodeList, } } else if (type == SQL_NODE_TABLE_COLUMN || type == SQL_NODE_VALUE) { // use the dynamic array list to decide if the function is valid or not - // select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2 + // select table_name1.field_name1, table_name2.field_name2 from table_name1, table_name2 + // todo refacto to remove this function if ((code = addProjectionExprAndResColumn(pQueryInfo, pItem, outerQuery, pMsgBuf)) != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/parser/src/queryInfoUtil.c b/source/libs/parser/src/queryInfoUtil.c index 0543478674..64156f3843 100644 --- a/source/libs/parser/src/queryInfoUtil.c +++ b/source/libs/parser/src/queryInfoUtil.c @@ -324,10 +324,14 @@ SArray* extractFunctionList(SArray* pExprInfoList) { assert(pExprInfoList != NULL); size_t len = taosArrayGetSize(pExprInfoList); - SArray* p = taosArrayInit(len, sizeof(int32_t)); + SArray* p = taosArrayInit(len, POINTER_BYTES); for(int32_t i = 0; i < len; ++i) { SExprInfo* pExprInfo = taosArrayGetP(pExprInfoList, i); - taosArrayPush(p, &pExprInfo->pExpr->_function.functionName); + if (pExprInfo->pExpr->nodeType == TEXPR_FUNCTION_NODE) { + taosArrayPush(p, &pExprInfo->pExpr->_function.functionName); + } else { + taosArrayPush(p, ""); + } } return p; diff --git a/source/libs/parser/test/parserTests.cpp b/source/libs/parser/test/parserTests.cpp index 1ecd9e3a08..5a240061b2 100644 --- a/source/libs/parser/test/parserTests.cpp +++ b/source/libs/parser/test/parserTests.cpp @@ -398,6 +398,7 @@ TEST(testCase, function_Test5) { TEST(testCase, function_Test10) { sqlCheck("select c from `t.1abc`", true); sqlCheck("select length(c) from `t.1abc`", true); + sqlCheck("select length(sum(col)) from `t.1abc`", true); sqlCheck("select sum(length(a+b)) from `t.1abc`", true); sqlCheck("select sum(sum(a+b)) from `t.1abc`", false); sqlCheck("select sum(length(a) + length(b)) from `t.1abc`", true); @@ -406,6 +407,8 @@ TEST(testCase, function_Test10) { sqlCheck("select cov(a, b) from `t.1abc`", true); sqlCheck("select sum(length(a) + count(b)) from `t.1abc`", false); + sqlCheck("select concat(sum(a), count(b)) from `t.1abc`", true); + sqlCheck("select concat(concat(a,b), concat(a,b)) from `t.1abc`", true); sqlCheck("select length(length(length(a))) from `t.1abc`", true); sqlCheck("select count() from `t.1abc`", false); @@ -415,13 +418,16 @@ TEST(testCase, function_Test10) { ///////////////////////////////////////////////////////////////////////////////////////////////////////////////// sqlCheck("select length119(a,b) from `t.1abc`", false); - sqlCheck("select length(a,b) from `t.1abc`", false); - sqlCheck("select block_dist() + 20 from `t.1abc`", false); - sqlCheck("select top(a, 20), count(b) from `t.1abc`", false); + sqlCheck("select length(a, b) from `t.1abc`", false); + sqlCheck("select block_dist() + 20 from `t.1abc`", true); sqlCheck("select count(b), c from `t.1abc`", false); - sqlCheck("select last_row(*), count(b) from `t.1abc`", false); - sqlCheck("select last_row(a, b) + 20 from `t.1abc`", false); - sqlCheck("select last_row(count(*)) from `t.1abc`", false); + sqlCheck("select top(a, 20), count(b) from `t.1abc`", false); +// sqlCheck("select top(a, 20), b from `t.1abc`", false); +// sqlCheck("select top(a, 20), a+20 from `t.1abc`", true); +// sqlCheck("select top(a, 20), bottom(a, 10) from `t.1abc`", false); +// sqlCheck("select last_row(*), count(b) from `t.1abc`", false); +// sqlCheck("select last_row(a, b) + 20 from `t.1abc`", false); +// sqlCheck("select last_row(count(*)) from `t.1abc`", false); } TEST(testCase, function_Test6) { diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 27a96b539e..6bd89905b1 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -26,18 +26,26 @@ extern "C" { #include "taosmsg.h" typedef struct SQueryNodeBasicInfo { - int32_t type; - char *name; + int32_t type; // operator type + char *name; // operator name } SQueryNodeBasicInfo; +typedef struct SQueryDistPlanNodeInfo { + bool stableQuery; // super table query or not + int32_t phase; // merge|partial + int32_t type; // operator type + char *name; // operator name + SEpSet *sourceEp; // data source epset +} SQueryDistPlanNodeInfo; + typedef struct SQueryTableInfo { - char *tableName; - uint64_t uid; + char *tableName; + uint64_t uid; + STimeWindow window; } SQueryTableInfo; typedef struct SQueryPlanNode { SQueryNodeBasicInfo info; - SQueryTableInfo tableInfo; SSchema *pSchema; // the schema of the input SSDatablock int32_t numOfCols; // number of input columns SArray *pExpr; // the query functions or sql aggregations @@ -51,9 +59,49 @@ typedef struct SQueryPlanNode { typedef struct SQueryDistPlanNode { SQueryNodeBasicInfo info; + SSchema *pSchema; // the schema of the input SSDatablock + int32_t numOfCols; // number of input columns + SArray *pExpr; // the query functions or sql aggregations + int32_t numOfExpr; // number of result columns, which is also the number of pExprs + void *pExtInfo; // additional information + // previous operator to generated result for current node to process + // in case of join, multiple prev nodes exist. + SArray *pPrevNodes; // upstream nodes, or exchange operator to load data from multiple sources. } SQueryDistPlanNode; +typedef struct SQueryCostSummary { + int64_t startTs; // Object created and added into the message queue + int64_t endTs; // the timestamp when the task is completed + int64_t cputime; // total cpu cost, not execute elapsed time + + int64_t loadRemoteDataDuration; // remote io time + int64_t loadNativeDataDuration; // native disk io time + + uint64_t loadNativeData; // blocks + SMA + header files + uint64_t loadRemoteData; // remote data acquired by exchange operator. + + uint64_t waitDuration; // the time to waiting to be scheduled in queue does matter, so we need to record it + int64_t addQTs; // the time to be added into the message queue, used to calculate the waiting duration in queue. + + uint64_t totalRows; + uint64_t loadRows; + uint32_t totalBlocks; + uint32_t loadBlocks; + uint32_t loadBlockAgg; + uint32_t skipBlocks; + uint64_t resultSize; // generated result size in Kb. +} SQueryCostSummary; + +typedef struct SQueryTask { + uint64_t queryId; // query id + uint64_t taskId; // task id + SQueryDistPlanNode *pNode; // operator tree + uint64_t status; // task status + SQueryCostSummary summary; // task execution summary + void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage +} SQueryTask; + #ifdef __cplusplus } #endif diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 804fe5c3bc..101ea3ec2f 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -95,17 +95,12 @@ int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQuery //====================================================================================================================== static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev, - SExprInfo** pExpr, int32_t numOfOutput, SQueryTableInfo* pTableInfo, void* pExtInfo) { + SExprInfo** pExpr, int32_t numOfOutput, void* pExtInfo) { SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode)); pNode->info.type = type; pNode->info.name = strdup(name); - if (pTableInfo->uid != 0 && pTableInfo->tableName) { // it is a true table - pNode->tableInfo.uid = pTableInfo->uid; - pNode->tableInfo.tableName = strdup(pTableInfo->tableName); - } - pNode->numOfExpr = numOfOutput; pNode->pExpr = taosArrayInit(numOfOutput, POINTER_BYTES); @@ -120,9 +115,10 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla switch(type) { case QNODE_TABLESCAN: { - STimeWindow* window = calloc(1, sizeof(STimeWindow)); - memcpy(window, pExtInfo, sizeof(STimeWindow)); - pNode->pExtInfo = window; + SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); + memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); + info->tableName = strdup(((SQueryTableInfo*) pExtInfo)->tableName); + pNode->pExtInfo = info; break; } @@ -179,21 +175,20 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe SArray* pExprs, SArray* tableCols) { if (pQueryInfo->info.onlyTagQuery) { int32_t num = (int32_t) taosArrayGetSize(pExprs); - SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info, NULL); + SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, NULL); if (pQueryInfo->info.distinct) { - pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, info, NULL); + pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL); } return pNode; } - STimeWindow* window = &pQueryInfo->window; - SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info, window); + SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); if (pQueryInfo->info.projectionQuery) { int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs); - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, info, NULL); + pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, NULL); } else { STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0); @@ -210,7 +205,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe pExpr[i] = p; } - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, info, NULL); + pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL); tfree(pExpr); } @@ -243,24 +238,24 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer if (aggregateFunc) { if (pQueryInfo->interval.interval > 0) { - pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->interval); + pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->interval); } else if (pQueryInfo->sessionWindow.gap > 0) { - pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->sessionWindow); + pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->sessionWindow); } else if (pQueryInfo->stateWindow.col.info.colId > 0) { - pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->stateWindow); + pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, &pQueryInfo->stateWindow); } else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) { - pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, info, &pQueryInfo->groupbyExpr); + pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, &pQueryInfo->groupbyExpr); } else { - pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, info, NULL); + pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, num, NULL); } } else { - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, info, NULL); + pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL); } } if (pQueryInfo->havingFieldNum > 0) { // int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1); -// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, NULL); +// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, NULL); } if (pQueryInfo->fillType != TSDB_FILL_NONE) { @@ -269,11 +264,11 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer pInfo->val = calloc(pNode->numOfExpr, sizeof(int64_t)); memcpy(pInfo->val, pQueryInfo->fillVal, pNode->numOfExpr); - pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, NULL, 0, info, pInfo); + pNode = createQueryNode(QNODE_FILL, "Fill", &pNode, 1, NULL, 0, pInfo); } if (pQueryInfo->limit.limit != -1 || pQueryInfo->limit.offset != 0) { - pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, info, &pQueryInfo->limit); + pNode = createQueryNode(QNODE_LIMIT, "Limit", &pNode, 1, NULL, 0, &pQueryInfo->limit); } return pNode; @@ -399,7 +394,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) { SQueryTableInfo info = {0}; int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]); SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables, - pQueryInfo->exprList[0]->pData, num, &info, NULL); + pQueryInfo->exprList[0]->pData, num, NULL); // 4. add the aggregation or projection execution node pNode = doCreateQueryPlanForSingleTableImpl(pQueryInfo, pNode, &info); @@ -419,8 +414,6 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { tfree(pQueryNode->pExtInfo); tfree(pQueryNode->pSchema); tfree(pQueryNode->info.name); - - tfree(pQueryNode->tableInfo.tableName); // dropAllExprInfo(pQueryNode->pExpr); if (pQueryNode->pPrevNodes != NULL) { @@ -447,9 +440,9 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level, switch(pQueryNode->info.type) { case QNODE_TABLESCAN: { - STimeWindow* win = (STimeWindow*)pQueryNode->pExtInfo; - len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64 " cols: ", - pQueryNode->tableInfo.tableName, pQueryNode->tableInfo.uid, win->skey, win->ekey); + SQueryTableInfo* pInfo = (SQueryTableInfo*) pQueryNode->pExtInfo; + len1 = sprintf(buf + len, "%s #%" PRIu64 ") time_range: %" PRId64 " - %" PRId64, + pInfo->tableName, pInfo->uid, pInfo->window.skey, pInfo->window.ekey); assert(len1 > 0); len += len1;