From 3936128742e8f468e99c6d7119d3ebe5ee561159 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 14 Dec 2021 07:37:54 -0500 Subject: [PATCH 1/2] TD-12034 Physical plan code. --- include/libs/planner/planner.h | 1 - source/libs/planner/inc/plannerInt.h | 38 ++++++------ source/libs/planner/src/physicalPlan.c | 80 +++++++++++++++++++++++--- source/libs/planner/src/planner.c | 31 +++------- 4 files changed, 101 insertions(+), 49 deletions(-) diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 8f217a0deb..be00ed65f0 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -54,7 +54,6 @@ enum OPERATOR_TYPE_E { }; struct SEpSet; -struct SQueryPlanNode; struct SPhyNode; struct SQueryStmtInfo; diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 2231c93362..6dcd19782c 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -25,18 +25,20 @@ extern "C" { #include "planner.h" #include "taosmsg.h" -enum LOGIC_PLAN_E { - LP_SCAN = 1, - LP_SESSION = 2, - LP_STATE = 3, - LP_INTERVAL = 4, - LP_FILL = 5, - LP_AGG = 6, - LP_JOIN = 7, - LP_PROJECT = 8, - LP_DISTINCT = 9, - LP_ORDER = 10 -}; +#define QNODE_TAGSCAN 1 +#define QNODE_TABLESCAN 2 +#define QNODE_PROJECT 3 +#define QNODE_AGGREGATE 4 +#define QNODE_GROUPBY 5 +#define QNODE_LIMIT 6 +#define QNODE_JOIN 7 +#define QNODE_DISTINCT 8 +#define QNODE_SORT 9 +#define QNODE_UNION 10 +#define QNODE_TIMEWINDOW 11 +#define QNODE_SESSIONWINDOW 12 +#define QNODE_STATEWINDOW 13 +#define QNODE_FILL 14 typedef struct SQueryNodeBasicInfo { int32_t type; // operator type @@ -64,10 +66,10 @@ typedef struct SQueryPlanNode { SArray *pExpr; // the query functions or sql aggregations int32_t numOfExpr; // number of result columns, which is also the number of pExprs void *pExtInfo; // additional information - // previous operator to generated result for current node to process + // children operator to generated result for current node to process // in case of join, multiple prev nodes exist. - SArray *pPrevNodes; // upstream nodes - struct SQueryPlanNode *nextNode; + SArray *pChildren; // upstream nodes + struct SQueryPlanNode *pParent; } SQueryPlanNode; typedef SSchema SSlotSchema; @@ -86,11 +88,13 @@ typedef struct SPhyNode { // children plan to generated result for current node to process // in case of join, multiple plan nodes exist. SArray *pChildren; + struct SPhyNode *pParent; } SPhyNode; typedef struct SScanPhyNode { - SPhyNode node; - uint64_t uid; // unique id of the table + SPhyNode node; + STimeWindow window; + uint64_t uid; // unique id of the table } SScanPhyNode; typedef SScanPhyNode STagScanPhyNode; diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 2bdc159af8..c4564a9a09 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -15,20 +15,84 @@ #include "plannerInt.h" +// typedef struct SQueryPlanNode { +// void *pExtInfo; // additional information +// SArray *pPrevNodes; // children +// struct SQueryPlanNode *nextNode; // parent +// } SQueryPlanNode; + +// typedef struct SSubplan { +// int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN +// SArray *pDatasource; // the datasource subplan,from which to fetch the result +// struct SPhyNode *pNode; // physical plan of current subplan +// } SSubplan; + +// typedef struct SQueryDag { +// SArray **pSubplans; +// } SQueryDag; + +// typedef struct SScanPhyNode { +// SPhyNode node; +// STimeWindow window; +// uint64_t uid; // unique id of the table +// } SScanPhyNode; + +// typedef SScanPhyNode STagScanPhyNode; + +void fillDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { + dataBlockSchema->index = 0; // todo + SWAP(dataBlockSchema->pSchema, pPlanNode->pSchema, SSchema*); + dataBlockSchema->numOfCols = pPlanNode->numOfCols; +} + +void fillPhyNode(SQueryPlanNode* pPlanNode, int32_t type, const char* name, SPhyNode* node) { + node->info.type = type; + node->info.name = name; + SWAP(node->pTargets, pPlanNode->pExpr, SArray*); + fillDataBlockSchema(pPlanNode, &(node->targetSchema)); +} + +SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) { + STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode)); + fillPhyNode(pPlanNode, OP_TagScan, "TagScan", (SPhyNode*)node); + return (SPhyNode*)node; +} + SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) { - return NULL; + STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode)); + fillPhyNode(pPlanNode, OP_TableScan, "SingleTableScan", (SPhyNode*)node); + return (SPhyNode*)node; } -SPhyNode* createPhyNode(SQueryPlanNode* node) { - switch (node->info.type) { - case LP_SCAN: - return createScanNode(node); +SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) { + SPhyNode* node = NULL; + switch (pPlanNode->info.type) { + case QNODE_TAGSCAN: + node = createTagScanNode(pPlanNode); + break; + case QNODE_TABLESCAN: + node = createScanNode(pPlanNode); + break; + default: + assert(false); } - return NULL; + if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) { + node->pChildren = taosArrayInit(4, POINTER_BYTES); + size_t size = taosArrayGetSize(pPlanNode->pChildren); + for(int32_t i = 0; i < size; ++i) { + SPhyNode* child = createPhyNode(taosArrayGet(pPlanNode->pChildren, i)); + child->pParent = node; + taosArrayPush(node->pChildren, &child); + } + } + return node; } -SPhyNode* createSubplan(SQueryPlanNode* pSubquery) { - return NULL; +SSubplan* createSubplan(SQueryPlanNode* pSubquery) { + SSubplan* subplan = calloc(1, sizeof(SSubplan)); + subplan->pNode = createPhyNode(pSubquery); + // todo + return subplan; } int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index e54b847230..b18dd29257 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -18,21 +18,6 @@ #include "parser.h" #include "plannerInt.h" -#define QNODE_TAGSCAN 1 -#define QNODE_TABLESCAN 2 -#define QNODE_PROJECT 3 -#define QNODE_AGGREGATE 4 -#define QNODE_GROUPBY 5 -#define QNODE_LIMIT 6 -#define QNODE_JOIN 7 -#define QNODE_DISTINCT 8 -#define QNODE_SORT 9 -#define QNODE_UNION 10 -#define QNODE_TIMEWINDOW 11 -#define QNODE_SESSIONWINDOW 12 -#define QNODE_STATEWINDOW 13 -#define QNODE_FILL 14 - typedef struct SFillEssInfo { int32_t fillType; // fill type int64_t *val; // fill value @@ -104,9 +89,9 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla taosArrayPush(pNode->pExpr, &pExpr[i]); } - pNode->pPrevNodes = taosArrayInit(4, POINTER_BYTES); + pNode->pChildren = taosArrayInit(4, POINTER_BYTES); for(int32_t i = 0; i < numOfPrev; ++i) { - taosArrayPush(pNode->pPrevNodes, &prev[i]); + taosArrayPush(pNode->pChildren, &prev[i]); } switch(type) { @@ -386,14 +371,14 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) { tfree(pQueryNode->info.name); // dropAllExprInfo(pQueryNode->pExpr); - if (pQueryNode->pPrevNodes != NULL) { - int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pPrevNodes); + if (pQueryNode->pChildren != NULL) { + int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren); for(int32_t i = 0; i < size; ++i) { - SQueryPlanNode* p = taosArrayGetP(pQueryNode->pPrevNodes, i); + SQueryPlanNode* p = taosArrayGetP(pQueryNode->pChildren, i); doDestroyQueryNode(p); } - taosArrayDestroy(pQueryNode->pPrevNodes); + taosArrayDestroy(pQueryNode->pChildren); } tfree(pQueryNode); @@ -607,8 +592,8 @@ int32_t printExprInfo(const char* buf, const SQueryPlanNode* pQueryNode, int32_t int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t level, int32_t totalLen) { int32_t len = doPrintPlan(buf, pQueryNode, level, totalLen); - for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pPrevNodes); ++i) { - SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pPrevNodes, i); + for(int32_t i = 0; i < taosArrayGetSize(pQueryNode->pChildren); ++i) { + SQueryPlanNode* p1 = taosArrayGetP(pQueryNode->pChildren, i); int32_t len1 = queryPlanToStringImpl(buf, p1, level + 1, len); len = len1; } From c84b6d09e497442c3a0e940fbb410915b7791f0b Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 15 Dec 2021 22:18:51 -0500 Subject: [PATCH 2/2] TD-12035 Scan subquery physical plan code. --- include/libs/parser/parser.h | 12 +- include/libs/planner/planner.h | 104 ++++++++++++----- include/libs/planner/plannerOp.h | 48 ++++++++ include/util/tarray.h | 2 +- source/libs/parser/src/insertParser.c | 52 +++++---- source/libs/parser/test/CMakeLists.txt | 11 +- source/libs/parser/test/insertTest.cpp | 21 ++++ source/libs/planner/inc/plannerInt.h | 51 +-------- source/libs/planner/src/physicalPlan.c | 150 +++++++++++++++++-------- source/libs/planner/src/planner.c | 3 +- source/libs/scheduler/CMakeLists.txt | 2 +- source/util/src/tarray.c | 2 +- 12 files changed, 295 insertions(+), 163 deletions(-) create mode 100644 include/libs/planner/plannerOp.h diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 2f152c3e2b..d65b5ab570 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -132,13 +132,15 @@ struct SInsertStmtInfo; bool qIsInsertSql(const char* pStr, size_t length); typedef struct SParseContext { + const char* pAcctId; + const char* pDbname; + void *pRpc; + const char* pClusterId; + const SEpSet* pEpSet; + int64_t id; // query id, generated by uuid generator + int8_t schemaAttached; // denote if submit block is built with table schema or not const char* pSql; // sql string size_t sqlLen; // length of the sql string - int64_t id; // operator id, generated by uuid generator - const char* pDbname; - const SEpSet* pEpSet; - int8_t schemaAttached; // denote if submit block is built with table schema or not - char* pMsg; // extended error message if exists to help avoid the problem in sql statement. int32_t msgLen; // max length of the msg } SParseContext; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index be00ed65f0..844757eeb5 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -20,51 +20,92 @@ extern "C" { #endif +#include "taosmsg.h" + #define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_PARTIAL 2 #define QUERY_TYPE_SCAN 3 enum OPERATOR_TYPE_E { - OP_TableScan = 1, - OP_DataBlocksOptScan = 2, - OP_TableSeqScan = 3, - OP_TagScan = 4, - OP_TableBlockInfoScan= 5, - OP_Aggregate = 6, - OP_Project = 7, - OP_Groupby = 8, - OP_Limit = 9, - OP_SLimit = 10, - OP_TimeWindow = 11, - OP_SessionWindow = 12, - OP_StateWindow = 22, - OP_Fill = 13, - OP_MultiTableAggregate = 14, - OP_MultiTableTimeInterval = 15, -// OP_DummyInput = 16, //TODO remove it after fully refactor. -// OP_MultiwayMergeSort = 17, // multi-way data merge into one input stream. -// OP_GlobalAggregate = 18, // global merge for the multi-way data sources. - OP_Filter = 19, - OP_Distinct = 20, - OP_Join = 21, - OP_AllTimeWindow = 23, - OP_AllMultiTableTimeInterval = 24, - OP_Order = 25, - OP_Exchange = 26, + OP_Unknown, +#define INCLUDE_AS_ENUM +#include "plannerOp.h" +#undef INCLUDE_AS_ENUM + OP_TotalNum }; struct SEpSet; -struct SPhyNode; struct SQueryStmtInfo; +typedef SSchema SSlotSchema; + +typedef struct SDataBlockSchema { + SSlotSchema *pSchema; + int32_t numOfCols; // number of columns +} SDataBlockSchema; + +typedef struct SQueryNodeBasicInfo { + int32_t type; // operator type + const char *name; // operator name +} SQueryNodeBasicInfo; + +typedef struct SPhyNode { + SQueryNodeBasicInfo info; + SArray *pTargets; // target list to be computed or scanned at this node + SArray *pConditions; // implicitly-ANDed qual conditions + SDataBlockSchema targetSchema; + // children plan to generated result for current node to process + // in case of join, multiple plan nodes exist. + SArray *pChildren; + struct SPhyNode *pParent; +} SPhyNode; + +typedef struct SScanPhyNode { + SPhyNode node; + uint64_t uid; // unique id of the table + int8_t tableType; +} SScanPhyNode; + +typedef SScanPhyNode SSystemTableScanPhyNode; +typedef SScanPhyNode STagScanPhyNode; + +typedef struct STableScanPhyNode { + SScanPhyNode scan; + uint8_t scanFlag; // denotes reversed scan of data or not + STimeWindow window; + SArray *pTagsConditions; // implicitly-ANDed tag qual conditions +} STableScanPhyNode; + +typedef STableScanPhyNode STableSeqScanPhyNode; + +typedef struct SProjectPhyNode { + SPhyNode node; +} SProjectPhyNode; + +typedef struct SExchangePhyNode { + SPhyNode node; + uint64_t templateId; + SArray *pSourceEpSet; // SEpSet +} SExchangePhyNode; + +typedef struct SSubplanId { + uint64_t queryId; + uint64_t templateId; + uint64_t subplanId; +} SSubplanId; + typedef struct SSubplan { - int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN - SArray *pDatasource; // the datasource subplan,from which to fetch the result - struct SPhyNode *pNode; // physical plan of current subplan + SSubplanId id; // unique id of the subplan + int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN + int32_t level; // the execution level of current subplan, starting from 0. + SEpSet execEpSet; // for the scan sub plan, the optional execution node + SArray *pChildern; // the datasource subplan,from which to fetch the result + SArray *pParents; // the data destination subplan, get data from current subplan + SPhyNode *pNode; // physical plan of current subplan } SSubplan; typedef struct SQueryDag { - SArray **pSubplans; + SArray *pSubplans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryDag; /** @@ -74,6 +115,7 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); + /** * Convert to subplan to string for the scheduler to send to the executor */ diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h new file mode 100644 index 0000000000..27c7c534a2 --- /dev/null +++ b/include/libs/planner/plannerOp.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#if defined(INCLUDE_AS_ENUM) // enum define mode + #undef OP_ENUM_MACRO + #define OP_ENUM_MACRO(op) OP_##op, +#elif defined(INCLUDE_AS_NAME) // comment define mode + #undef OP_ENUM_MACRO + #define OP_ENUM_MACRO(op) #op, +#else + #error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME +#endif + +OP_ENUM_MACRO(TableScan) +OP_ENUM_MACRO(DataBlocksOptScan) +OP_ENUM_MACRO(TableSeqScan) +OP_ENUM_MACRO(TagScan) +OP_ENUM_MACRO(TableBlockInfoScan) +OP_ENUM_MACRO(Aggregate) +OP_ENUM_MACRO(Project) +OP_ENUM_MACRO(Groupby) +OP_ENUM_MACRO(Limit) +OP_ENUM_MACRO(SLimit) +OP_ENUM_MACRO(TimeWindow) +OP_ENUM_MACRO(SessionWindow) +OP_ENUM_MACRO(StateWindow) +OP_ENUM_MACRO(Fill) +OP_ENUM_MACRO(MultiTableAggregate) +OP_ENUM_MACRO(MultiTableTimeInterval) +OP_ENUM_MACRO(Filter) +OP_ENUM_MACRO(Distinct) +OP_ENUM_MACRO(Join) +OP_ENUM_MACRO(AllTimeWindow) +OP_ENUM_MACRO(AllMultiTableTimeInterval) +OP_ENUM_MACRO(Order) +OP_ENUM_MACRO(Exchange) diff --git a/include/util/tarray.h b/include/util/tarray.h index e0f14dcd25..f7c72add01 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -41,7 +41,7 @@ typedef struct SArray { * @param elemSize * @return */ -void* taosArrayInit(size_t size, size_t elemSize); +SArray* taosArrayInit(size_t size, size_t elemSize); /** * diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index fa59bc6ca7..97102c5b00 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -71,8 +71,7 @@ typedef struct SInsertParseContext { const char* pSql; SMsgBuf msg; struct SCatalog* pCatalog; - SMetaData meta; // need release - const STableMeta* pTableMeta; + STableMeta tableMeta; SHashObj* pTableBlockHashObj; // data block for each table. need release int32_t totalNum; SInsertStmtInfo* pOutput; @@ -165,29 +164,29 @@ static int32_t skipInsertInto(SInsertParseContext* pCxt) { return TSDB_CODE_SUCCESS; } -static int32_t buildTableName(SInsertParseContext* pCxt, SToken* pStname, SArray* tableNameList) { +static int32_t buildName(SInsertParseContext* pCxt, SToken* pStname, char* fullDbName, char* tableName) { if (parserValidateIdToken(pStname) != TSDB_CODE_SUCCESS) { return buildSyntaxErrMsg(&pCxt->msg, "invalid table name", pStname->z); } - SName name = {0}; - strcpy(name.dbname, pCxt->pComCxt->pDbname); - strncpy(name.tname, pStname->z, pStname->n); - taosArrayPush(tableNameList, &name); - + char* p = strnchr(pStname->z, TS_PATH_DELIMITER[0], pStname->n, false); + if (NULL != p) { // db.table + strcpy(fullDbName, pCxt->pComCxt->pAcctId); + fullDbName[strlen(pCxt->pComCxt->pAcctId)] = TS_PATH_DELIMITER[0]; + strncpy(fullDbName, pStname->z, p - pStname->z); + strncpy(tableName, p + 1, pStname->n - (p - pStname->z) - 1); + } else { + snprintf(fullDbName, TSDB_FULL_DB_NAME_LEN, "%s.%s", pCxt->pComCxt->pAcctId, pCxt->pComCxt->pDbname); + strncpy(tableName, pStname->z, pStname->n); + } return TSDB_CODE_SUCCESS; } -static int32_t buildMetaReq(SInsertParseContext* pCxt, SToken* pStname, SCatalogReq* pMetaReq) { - pMetaReq->pTableName = taosArrayInit(4, sizeof(SName)); - return buildTableName(pCxt, pStname, pMetaReq->pTableName); -} - static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) { - SCatalogReq req; - CHECK_CODE(buildMetaReq(pCxt, pTname, &req)); - CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, NULL, NULL, NULL, &pCxt->meta)); //TODO - pCxt->pTableMeta = (STableMeta*)taosArrayGetP(pCxt->meta.pTableMeta, 0); + char fullDbName[TSDB_FULL_DB_NAME_LEN] = {0}; + char tableName[TSDB_TABLE_NAME_LEN] = {0}; + CHECK_CODE(buildName(pCxt, pTname, fullDbName, tableName)); + CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, pCxt->pComCxt->pRpc, pCxt->pComCxt->pEpSet, fullDbName, &pCxt->tableMeta)); return TSDB_CODE_SUCCESS; } @@ -646,13 +645,13 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) NEXT_TOKEN(pCxt->pSql, sToken); CHECK_CODE(getTableMeta(pCxt, &sToken)); - if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) { + if (TSDB_SUPER_TABLE != pCxt->tableMeta.tableType) { return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed"); } - SSchema* pTagsSchema = getTableTagSchema(pCxt->pTableMeta); + SSchema* pTagsSchema = getTableTagSchema(&pCxt->tableMeta); SParsedDataColInfo spd = {0}; - setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(pCxt->pTableMeta)); + setBoundColumnInfo(&spd, pTagsSchema, getNumOfTags(&pCxt->tableMeta)); // pSql -> [(tag1_name, ...)] TAGS (tag1_value, ...) NEXT_TOKEN(pCxt->pSql, sToken); @@ -669,7 +668,7 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) if (TK_LP != sToken.type) { return buildSyntaxErrMsg(&pCxt->msg, "( is expected", sToken.z); } - CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(pCxt->pTableMeta).precision)); + CHECK_CODE(parseTagsClause(pCxt, &spd, pTagsSchema, getTableInfo(&pCxt->tableMeta).precision)); return TSDB_CODE_SUCCESS; } @@ -811,12 +810,12 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { } STableDataBlocks *dataBuf = NULL; - CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, - sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL)); + CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, pCxt->tableMeta.uid, TSDB_DEFAULT_PAYLOAD_SIZE, + sizeof(SSubmitBlk), getTableInfo(&pCxt->tableMeta).rowSize, &pCxt->tableMeta, &dataBuf, NULL)); if (TK_LP == sToken.type) { // pSql -> field1_name, ...) - CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo)); + CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(&pCxt->tableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo)); NEXT_TOKEN(pCxt->pSql, sToken); } @@ -862,18 +861,17 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) { .pSql = pContext->pSql, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, .pCatalog = NULL, - .pTableMeta = NULL, + .tableMeta = {0}, .pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false), .totalNum = 0, .pOutput = *pInfo }; - CHECK_CODE(catalogGetHandle(NULL, &context.pCatalog)); //TODO - if (NULL == context.pTableBlockHashObj) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + CHECK_CODE(catalogGetHandle(pContext->pClusterId, &context.pCatalog)); CHECK_CODE(skipInsertInto(&context)); CHECK_CODE(parseInsertBody(&context)); diff --git a/source/libs/parser/test/CMakeLists.txt b/source/libs/parser/test/CMakeLists.txt index 4b9e586be3..03b76152da 100644 --- a/source/libs/parser/test/CMakeLists.txt +++ b/source/libs/parser/test/CMakeLists.txt @@ -6,13 +6,16 @@ SET(CMAKE_CXX_STANDARD 11) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(parserTest ${SOURCE_LIST}) -TARGET_LINK_LIBRARIES( - parserTest - PUBLIC os util common parser catalog transport gtest function planner query -) TARGET_INCLUDE_DIRECTORIES( parserTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/parser/" PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/parser/inc" ) + +TARGET_LINK_LIBRARIES( + parserTest + PUBLIC os util common parser catalog transport gtest function planner query +) + +TARGET_LINK_OPTIONS(parserTest PRIVATE -Wl,-wrap,malloc) diff --git a/source/libs/parser/test/insertTest.cpp b/source/libs/parser/test/insertTest.cpp index 9cf48da4eb..5877adf41c 100644 --- a/source/libs/parser/test/insertTest.cpp +++ b/source/libs/parser/test/insertTest.cpp @@ -27,6 +27,27 @@ namespace { } } +extern "C" { + +#include + +void *__real_malloc(size_t); + +void *__wrap_malloc(size_t c) { + // printf("My MALLOC called: %d\n", c); + // void *array[32]; + // int size = backtrace(array, 32); + // char **symbols = backtrace_symbols(array, size); + // for (int i = 0; i < size; ++i) { + // cout << symbols[i] << endl; + // } + // free(symbols); + + return __real_malloc(c); +} + +} + // syntax: // INSERT INTO // tb_name diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 6dcd19782c..6c65e4810d 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -23,6 +23,7 @@ extern "C" { #include "common.h" #include "tarray.h" #include "planner.h" +#include "parser.h" #include "taosmsg.h" #define QNODE_TAGSCAN 1 @@ -40,11 +41,6 @@ extern "C" { #define QNODE_STATEWINDOW 13 #define QNODE_FILL 14 -typedef struct SQueryNodeBasicInfo { - int32_t type; // operator type - char *name; // operator name -} SQueryNodeBasicInfo; - typedef struct SQueryDistPlanNodeInfo { bool stableQuery; // super table query or not int32_t phase; // merge|partial @@ -54,8 +50,9 @@ typedef struct SQueryDistPlanNodeInfo { } SQueryDistPlanNodeInfo; typedef struct SQueryTableInfo { - char *tableName; - uint64_t uid; + char *tableName; // to be deleted + uint64_t uid; // to be deleted + STableMetaInfo* pMeta; STimeWindow window; } SQueryTableInfo; @@ -72,46 +69,6 @@ typedef struct SQueryPlanNode { struct SQueryPlanNode *pParent; } SQueryPlanNode; -typedef SSchema SSlotSchema; - -typedef struct SDataBlockSchema { - int32_t index; - SSlotSchema *pSchema; - int32_t numOfCols; // number of columns -} SDataBlockSchema; - -typedef struct SPhyNode { - SQueryNodeBasicInfo info; - SArray *pTargets; // target list to be computed or scanned at this node - SArray *pConditions; // implicitly-ANDed qual conditions - SDataBlockSchema targetSchema; - // children plan to generated result for current node to process - // in case of join, multiple plan nodes exist. - SArray *pChildren; - struct SPhyNode *pParent; -} SPhyNode; - -typedef struct SScanPhyNode { - SPhyNode node; - STimeWindow window; - uint64_t uid; // unique id of the table -} SScanPhyNode; - -typedef SScanPhyNode STagScanPhyNode; - -typedef SScanPhyNode SSystemTableScanPhyNode; - -typedef struct SMultiTableScanPhyNode { - SScanPhyNode scan; - SArray *pTagsConditions; // implicitly-ANDed tag qual conditions -} SMultiTableScanPhyNode; - -typedef SMultiTableScanPhyNode SMultiTableSeqScanPhyNode; - -typedef struct SProjectPhyNode { - SPhyNode node; -} SProjectPhyNode; - /** * Optimize the query execution plan, currently not implement yet. * @param pQueryNode diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index c4564a9a09..e7acb12bc0 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -14,64 +14,107 @@ */ #include "plannerInt.h" +#include "parser.h" -// typedef struct SQueryPlanNode { -// void *pExtInfo; // additional information -// SArray *pPrevNodes; // children -// struct SQueryPlanNode *nextNode; // parent -// } SQueryPlanNode; +static const char* gOpName[] = { + "Unknown", +#define INCLUDE_AS_NAME +#include "plannerOp.h" +#undef INCLUDE_AS_NAME +}; -// typedef struct SSubplan { -// int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN -// SArray *pDatasource; // the datasource subplan,from which to fetch the result -// struct SPhyNode *pNode; // physical plan of current subplan -// } SSubplan; +typedef struct SPlanContext { + struct SCatalog* pCatalog; + struct SQueryDag* pDag; + SSubplan* pCurrentSubplan; + SSubplanId nextId; +} SPlanContext; -// typedef struct SQueryDag { -// SArray **pSubplans; -// } SQueryDag; - -// typedef struct SScanPhyNode { -// SPhyNode node; -// STimeWindow window; -// uint64_t uid; // unique id of the table -// } SScanPhyNode; - -// typedef SScanPhyNode STagScanPhyNode; - -void fillDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { - dataBlockSchema->index = 0; // todo +static void toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { SWAP(dataBlockSchema->pSchema, pPlanNode->pSchema, SSchema*); dataBlockSchema->numOfCols = pPlanNode->numOfCols; } -void fillPhyNode(SQueryPlanNode* pPlanNode, int32_t type, const char* name, SPhyNode* node) { +static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { + SPhyNode* node = (SPhyNode*)calloc(1, size); node->info.type = type; - node->info.name = name; + node->info.name = gOpName[type]; SWAP(node->pTargets, pPlanNode->pExpr, SArray*); - fillDataBlockSchema(pPlanNode, &(node->targetSchema)); + toDataBlockSchema(pPlanNode, &(node->targetSchema)); } -SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) { - STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode)); - fillPhyNode(pPlanNode, OP_TagScan, "TagScan", (SPhyNode*)node); - return (SPhyNode*)node; +static SPhyNode* createTagScanNode(SQueryPlanNode* pPlanNode) { + return initPhyNode(pPlanNode, OP_TagScan, sizeof(STagScanPhyNode)); } -SPhyNode* createScanNode(SQueryPlanNode* pPlanNode) { - STagScanPhyNode* node = calloc(1, sizeof(STagScanPhyNode)); - fillPhyNode(pPlanNode, OP_TableScan, "SingleTableScan", (SPhyNode*)node); - return (SPhyNode*)node; +static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { + SSubplan* subplan = calloc(1, sizeof(SSubplan)); + subplan->id = pCxt->nextId; + ++(pCxt->nextId.subplanId); + subplan->type = type; + subplan->level = 0; + if (NULL != pCxt->pCurrentSubplan) { + subplan->level = pCxt->pCurrentSubplan->level + 1; + if (NULL == pCxt->pCurrentSubplan->pChildern) { + pCxt->pCurrentSubplan->pChildern = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + } + taosArrayPush(pCxt->pCurrentSubplan->pChildern, subplan); + subplan->pParents = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + taosArrayPush(subplan->pParents, pCxt->pCurrentSubplan); + } + pCxt->pCurrentSubplan = subplan; + return subplan; } -SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) { +static uint8_t getScanFlag(SQueryPlanNode* pPlanNode) { + // todo + return MASTER_SCAN; +} + +static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { + STableScanPhyNode* node = (STableScanPhyNode*)initPhyNode(pPlanNode, OP_TableScan, sizeof(STableScanPhyNode)); + node->scan.uid = pTable->pMeta->pTableMeta->uid; + node->scan.tableType = pTable->pMeta->pTableMeta->tableType; + node->scanFlag = getScanFlag(pPlanNode); + node->window = pTable->window; + // todo tag cond +} + +static void vgroupToEpSet(const SVgroupMsg* vg, SEpSet* epSet) { + // todo +} + +static void splitSubplanBySTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { + SVgroupsInfo* vgroupList = pTable->pMeta->vgroupList; + for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { + SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); + vgroupToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet); + subplan->pNode = createTableScanNode(pCxt, pPlanNode, pTable); + // todo reset pCxt->pCurrentSubplan + } +} + +static SPhyNode* createExchangeNode() { + +} + +static SPhyNode* createScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { + SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo; + if (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType) { + splitSubplanBySTable(pCxt, pPlanNode, pTable); + return createExchangeNode(pCxt, pTable); + } + return createTableScanNode(pCxt, pPlanNode, pTable); +} + +static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SPhyNode* node = NULL; switch (pPlanNode->info.type) { case QNODE_TAGSCAN: node = createTagScanNode(pPlanNode); break; case QNODE_TABLESCAN: - node = createScanNode(pPlanNode); + node = createScanNode(pCxt, pPlanNode); break; default: assert(false); @@ -80,7 +123,7 @@ SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) { node->pChildren = taosArrayInit(4, POINTER_BYTES); size_t size = taosArrayGetSize(pPlanNode->pChildren); for(int32_t i = 0; i < size; ++i) { - SPhyNode* child = createPhyNode(taosArrayGet(pPlanNode->pChildren, i)); + SPhyNode* child = createPhyNode(pCxt, taosArrayGet(pPlanNode->pChildren, i)); child->pParent = node; taosArrayPush(node->pChildren, &child); } @@ -88,13 +131,30 @@ SPhyNode* createPhyNode(SQueryPlanNode* pPlanNode) { return node; } -SSubplan* createSubplan(SQueryPlanNode* pSubquery) { - SSubplan* subplan = calloc(1, sizeof(SSubplan)); - subplan->pNode = createPhyNode(pSubquery); - // todo - return subplan; +static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { + SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); + subplan->pNode = createPhyNode(pCxt, pRoot); + SArray* l0 = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + taosArrayPush(l0, &subplan); + taosArrayPush(pCxt->pDag->pSubplans, &l0); + // todo deal subquery } -int32_t createDag(struct SQueryPlanNode* pQueryNode, struct SEpSet* pQnode, struct SQueryDag** pDag) { - return 0; +int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag) { + SPlanContext context = { + .pCatalog = pCatalog, + .pDag = calloc(1, sizeof(SQueryDag)), + .pCurrentSubplan = NULL + }; + if (NULL == context.pDag) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + context.pDag->pSubplans = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); + createSubplanByLevel(&context, pQueryNode); + *pDag = context.pDag; + return TSDB_CODE_SUCCESS; +} + +int32_t subPlanToString(struct SSubplan *pPhyNode, char** str) { + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index b18dd29257..19aac36e78 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -95,6 +95,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla } switch(type) { + case QNODE_TAGSCAN: case QNODE_TABLESCAN: { SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); @@ -162,7 +163,7 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe SArray* pExprs, SArray* tableCols) { if (pQueryInfo->info.onlyTagQuery) { int32_t num = (int32_t) taosArrayGetSize(pExprs); - SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, NULL); + SQueryPlanNode* pNode = createQueryNode(QNODE_TAGSCAN, "TableTagScan", NULL, 0, pExprs->pData, num, info); if (pQueryInfo->info.distinct) { pNode = createQueryNode(QNODE_DISTINCT, "Distinct", &pNode, 1, pExprs->pData, num, NULL); diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 770a6b02c2..fd00085381 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,5 +9,5 @@ target_include_directories( target_link_libraries( scheduler - PRIVATE os util planner + PRIVATE os util planner common ) \ No newline at end of file diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 581a797343..cc8d6646b6 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -17,7 +17,7 @@ #include "tarray.h" #include "talgo.h" -void* taosArrayInit(size_t size, size_t elemSize) { +SArray* taosArrayInit(size_t size, size_t elemSize) { assert(elemSize > 0); if (size < TARRAY_MIN_SIZE) {