diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 69071d3a2a..9344fd09c3 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -20,7 +20,7 @@ extern "C" { #endif -#include "nodes.h" +#include "querynodes.h" typedef struct SDatabaseOptions { int32_t numOfBlocks; @@ -49,6 +49,30 @@ typedef struct SCreateDatabaseStmt { SDatabaseOptions options; } SCreateDatabaseStmt; +typedef struct STableOptions { + int32_t keep; + int32_t ttl; + char comments[TSDB_STB_COMMENT_LEN]; +} STableOptions; + +typedef struct SColumnDefNode { + ENodeType type; + char colName[TSDB_COL_NAME_LEN]; + SDataType dataType; + char comments[TSDB_STB_COMMENT_LEN]; +} SColumnDefNode; + +typedef struct SCreateTableStmt { + ENodeType type; + char dbName[TSDB_DB_NAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; + bool ignoreExists; + SNodeList* pCols; + STableOptions options; +} SCreateTableStmt; + +// CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definitionn] ...) [table_options] + #ifdef __cplusplus } #endif diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 5cae08b0af..8009f5dc11 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -65,6 +65,7 @@ typedef enum ENodeType { QUERY_NODE_TARGET, QUERY_NODE_DATABLOCK_DESC, QUERY_NODE_SLOT_DESC, + QUERY_NODE_COLUMN_DEF, // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR, @@ -72,12 +73,15 @@ typedef enum ENodeType { QUERY_NODE_SHOW_STMT, QUERY_NODE_VNODE_MODIF_STMT, QUERY_NODE_CREATE_DATABASE_STMT, + QUERY_NODE_CREATE_TABLE_STMT, // logic plan node QUERY_NODE_LOGIC_PLAN_SCAN, QUERY_NODE_LOGIC_PLAN_JOIN, QUERY_NODE_LOGIC_PLAN_AGG, QUERY_NODE_LOGIC_PLAN_PROJECT, + QUERY_NODE_LOGIC_SUBPLAN, + QUERY_NODE_LOGIC_PLAN, // physical plan node QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, @@ -89,8 +93,9 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_AGG, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_SORT, - - QUERY_NODE_DSINK_DISPATCH + QUERY_NODE_PHYSICAL_PLAN_DISPATCH, + QUERY_NODE_PHYSICAL_SUBPLAN, + QUERY_NODE_PHYSICAL_PLAN } ENodeType; /** diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 6690940a41..cf5d17cd74 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -65,6 +65,18 @@ typedef struct SProjectLogicNode { SNodeList* pProjections; } SProjectLogicNode; +typedef struct SSubLogicPlan { + ENodeType type; + SNodeList* pChildren; + SNodeList* pParents; + SLogicNode* pNode; +} SSubLogicPlan; + +typedef struct SQueryLogicPlan { + ENodeType type;; + SNodeList* pSubplans; +} SQueryLogicPlan; + typedef struct SSlotDescNode { ENodeType type; int16_t slotId; @@ -98,6 +110,7 @@ typedef struct SScanPhysiNode { int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC int32_t count; // repeat count int32_t reverse; // reverse scan count + char tableName[TSDB_TABLE_NAME_LEN]; } SScanPhysiNode; typedef SScanPhysiNode SSystemTableScanPhysiNode; @@ -159,6 +172,39 @@ typedef struct SDataInserterNode { char *pData; } SDataInserterNode; +typedef struct SSubplanId { + uint64_t queryId; + int32_t templateId; + int32_t subplanId; +} SSubplanId; + +typedef enum ESubplanType { + SUBPLAN_TYPE_MERGE = 1, + SUBPLAN_TYPE_PARTIAL, + SUBPLAN_TYPE_SCAN, + SUBPLAN_TYPE_MODIFY +} ESubplanType; + +typedef struct SSubplan { + ENodeType type; + SSubplanId id; // unique id of the subplan + ESubplanType subplanType; + int32_t msgType; // message type for subplan, used to denote the send message type to vnode. + int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner. + SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node + SNodeList* pChildren; // the datasource subplan,from which to fetch the result + SNodeList* pParents; // the data destination subplan, get data from current subplan + SPhysiNode* pNode; // physical plan of current subplan + SDataSinkNode* pDataSink; // data of the subplan flow into the datasink +} SSubplan; + +typedef struct SQueryPlan { + ENodeType type;; + uint64_t queryId; + int32_t numOfSubplans; + SNodeList* pSubplans; // SNodeListNode. The execution level of subplan, starting from 0. +} SQueryPlan; + #ifdef __cplusplus } #endif diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 9d316be54d..cf650f95d1 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -22,35 +22,6 @@ extern "C" { #include "plannodes.h" -#define QUERY_TYPE_MERGE 1 -#define QUERY_TYPE_PARTIAL 2 -#define QUERY_TYPE_SCAN 3 -#define QUERY_TYPE_MODIFY 4 - -typedef struct SSubplanId { - uint64_t queryId; - uint64_t templateId; - uint64_t subplanId; -} SSubplanId; - -typedef struct SSubplan { - SSubplanId id; // unique id of the subplan - int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY - int32_t msgType; // message type for subplan, used to denote the send message type to vnode. - int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner. - SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node - SArray* pChildren; // the datasource subplan,from which to fetch the result - SArray* pParents; // the data destination subplan, get data from current subplan - SPhysiNode* pNode; // physical plan of current subplan - SDataSinkNode* pDataSink; // data of the subplan flow into the datasink -} SSubplan; - -typedef struct SQueryPlan { - uint64_t queryId; - int32_t numOfSubplans; - SArray* pSubplans; // SArray*>. The execution level of subplan, starting from 0. -} SQueryPlan; - typedef struct SPlanContext { uint64_t queryId; SNode* pAstRoot; diff --git a/include/util/tjson.h b/include/util/tjson.h index 2d9f433ab2..335ff0d4ba 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -54,10 +54,12 @@ typedef int32_t (*FToJson)(const void* pObj, SJson* pJson); int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj); int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj); +int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize, int32_t num); typedef int32_t (*FToObject)(const SJson* pJson, void* pObj); int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj); +int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize); char* tjsonToString(const SJson* pJson); char* tjsonToUnformattedString(const SJson* pJson); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7c6093ebf1..5ae5cd39fb 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -762,8 +762,8 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM SVgObj *pVgroup = NULL; SQueryPlan *pPlan = qStringToQueryPlan(pTopic->physicalPlan); SArray *pArray = NULL; - SArray *inner = taosArrayGet(pPlan->pSubplans, 0); - SSubplan *plan = taosArrayGetP(inner, 0); + SNodeListNode *inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); + SSubplan *plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); SArray *unassignedVg = pSub->unassignedVg; void *pIter = NULL; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 501b476f8a..343b3a3c95 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -26,7 +26,7 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) { } int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) { - if (QUERY_NODE_DSINK_DISPATCH == nodeType(pDataSink)) { + if (QUERY_NODE_PHYSICAL_PLAN_DISPATCH == nodeType(pDataSink)) { return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); } return TSDB_CODE_FAILED; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index b1fd4bb56f..fd54537d65 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -90,6 +90,10 @@ static char* nodeName(ENodeType type) { return "PhysiJoin"; case QUERY_NODE_PHYSICAL_PLAN_AGG: return "PhysiAgg"; + case QUERY_NODE_PHYSICAL_SUBPLAN: + return "PhysiSubplan"; + case QUERY_NODE_PHYSICAL_PLAN: + return "PhysiPlan"; default: break; } @@ -462,6 +466,164 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkSubplanIdQueryId = "QueryId"; +static const char* jkSubplanIdTemplateId = "TemplateId"; +static const char* jkSubplanIdSubplanId = "SubplanId"; + +static int32_t subplanIdToJson(const void* pObj, SJson* pJson) { + const SSubplanId* pNode = (const SSubplanId*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkSubplanIdQueryId, pNode->queryId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSubplanIdTemplateId, pNode->templateId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSubplanIdSubplanId, pNode->subplanId); + } + + return code; +} + +static int32_t jsonToSubplanId(const SJson* pJson, void* pObj) { + SSubplanId* pNode = (SSubplanId*)pObj; + + int32_t code = tjsonGetUBigIntValue(pJson, jkSubplanIdQueryId, &pNode->queryId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkSubplanIdTemplateId, &pNode->templateId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkSubplanIdSubplanId, &pNode->subplanId); + } + + return code; +} + +static const char* jkEndPointFqdn = "Fqdn"; +static const char* jkEndPointPort = "Port"; + +static int32_t epToJson(const void* pObj, SJson* pJson) { + const SEp* pNode = (const SEp*)pObj; + + int32_t code = tjsonAddStringToObject(pJson, jkEndPointFqdn, pNode->fqdn); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkEndPointPort, pNode->port); + } + + return code; +} + +static int32_t jsonToEp(const SJson* pJson, void* pObj) { + SEp* pNode = (SEp*)pObj; + + int32_t code = tjsonGetStringValue(pJson, jkEndPointFqdn, pNode->fqdn); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetSmallIntValue(pJson, jkEndPointPort, &pNode->port); + } + + return code; +} + +static const char* jkQueryNodeAddrId = "Id"; +static const char* jkQueryNodeAddrInUse = "InUse"; +static const char* jkQueryNodeAddrNumOfEps = "NumOfEps"; +static const char* jkQueryNodeAddrEps = "Eps"; + +static int32_t queryNodeAddrToJson(const void* pObj, SJson* pJson) { + const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkQueryNodeAddrId, pNode->nodeId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkQueryNodeAddrInUse, pNode->epset.inUse); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkQueryNodeAddrNumOfEps, pNode->epset.numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddArray(pJson, jkQueryNodeAddrEps, epToJson, pNode->epset.eps, sizeof(SEp), pNode->epset.numOfEps); + } + + return code; +} + +static int32_t jsonToQueryNodeAddr(const SJson* pJson, void* pObj) { + SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj; + + int32_t code = tjsonGetIntValue(pJson, jkQueryNodeAddrId, &pNode->nodeId); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkQueryNodeAddrInUse, &pNode->epset.inUse); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkQueryNodeAddrNumOfEps, &pNode->epset.numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToArray(pJson, jkQueryNodeAddrEps, jsonToEp, pNode->epset.eps, sizeof(SEp)); + } + + return code; +} + +static const char* jkSubplanId = "Id"; +static const char* jkSubplanType = "SubplanType"; +static const char* jkSubplanMsgType = "MsgType"; +static const char* jkSubplanLevel = "Level"; +static const char* jkSubplanNodeAddr = "NodeAddr"; +static const char* jkSubplanRootNode = "RootNode"; +static const char* jkSubplanDataSink = "DataSink"; + +static int32_t subplanToJson(const void* pObj, SJson* pJson) { + const SSubplan* pNode = (const SSubplan*)pObj; + + int32_t code = tjsonAddObject(pJson, jkSubplanId, subplanIdToJson, &pNode->id); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSubplanType, pNode->subplanType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSubplanMsgType, pNode->msgType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSubplanLevel, pNode->level); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSubplanNodeAddr, queryNodeAddrToJson, &pNode->execNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSubplanRootNode, nodeToJson, pNode->pNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSubplanDataSink, nodeToJson, pNode->pDataSink); + } + + return code; +} + +static int32_t jsonToSubplan(const SJson* pJson, void* pObj) { + SSubplan* pNode = (SSubplan*)pObj; + + int32_t code = tjsonToObject(pJson, jkSubplanId, jsonToSubplanId, &pNode->id); + if (TSDB_CODE_SUCCESS == code) { + int32_t val; + code = tjsonGetIntValue(pJson, jkSubplanType, &val); + pNode->subplanType = val; + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkSubplanMsgType, &pNode->msgType); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkSubplanLevel, &pNode->level); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkSubplanNodeAddr, jsonToQueryNodeAddr, &pNode->execNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSubplanRootNode, (SNode**)&pNode->pNode); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSubplanDataSink, (SNode**)&pNode->pDataSink); + } + + return code; +} + static const char* jkAggLogicPlanGroupKeys = "GroupKeys"; static const char* jkAggLogicPlanAggFuncs = "AggFuncs"; @@ -1064,6 +1226,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return physiJoinNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_AGG: return physiAggNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_SUBPLAN: + return subplanToJson(pObj, pJson); default: break; } @@ -1127,13 +1291,15 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiJoinNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_AGG: return jsonToPhysiAggNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_SUBPLAN: + return jsonToSubplan(pJson, pObj); default: break; } return TSDB_CODE_SUCCESS; } -static const char* jkNodeType = "Type"; +static const char* jkNodeType = "NodeType"; static const char* jkNodeName = "Name"; static int32_t nodeToJson(const void* pObj, SJson* pJson) { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index c34b0e9d72..70dcab745f 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -95,6 +95,10 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SJoinPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_AGG: return makeNode(type, sizeof(SAggPhysiNode)); + case QUERY_NODE_PHYSICAL_SUBPLAN: + return makeNode(type, sizeof(SSubplan)); + case QUERY_NODE_PHYSICAL_PLAN: + return makeNode(type, sizeof(SQueryPlan)); default: break; } diff --git a/source/libs/parser/inc/astCreateFuncs.h b/source/libs/parser/inc/astCreateFuncs.h index 5ca6b8576f..5cddd82e2c 100644 --- a/source/libs/parser/inc/astCreateFuncs.h +++ b/source/libs/parser/inc/astCreateFuncs.h @@ -32,6 +32,11 @@ typedef struct SAstCreateContext { SNode* pRootNode; } SAstCreateContext; +typedef struct STokenPair { + SToken first; + SToken second; +} STokenPair; + extern SToken nil_token; void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt); @@ -76,8 +81,6 @@ SNode* addLimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pLimit); SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable); SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight); -SDatabaseOptions* createDefaultDatabaseOptions(SAstCreateContext* pCxt); - typedef enum EDatabaseOptionType { DB_OPTION_BLOCKS = 0, DB_OPTION_CACHE, @@ -99,11 +102,21 @@ typedef enum EDatabaseOptionType { DB_OPTION_MAX } EDatabaseOptionType; - +SDatabaseOptions* createDefaultDatabaseOptions(SAstCreateContext* pCxt); SDatabaseOptions* setDatabaseOption(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, EDatabaseOptionType type, const SToken* pVal); - SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pDbName, SDatabaseOptions* pOptions); +typedef enum ETableOptionType { + TABLE_OPTION_KEEP = 0, + TABLE_OPTION_TTL, + TABLE_OPTION_COMMENT, + + TABLE_OPTION_MAX +} ETableOptionType; +STableOptions* createDefaultTableOptions(SAstCreateContext* pCxt); +STableOptions* setTableOption(SAstCreateContext* pCxt, STableOptions* pOptions, ETableOptionType type, const SToken* pVal); +SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDataType dataType, const SToken* pComment); +SNode* createCreateTableStmt(SAstCreateContext* pCxt, bool ignoreExists, const STokenPair* pFullTableName, SNodeList* pCols, STableOptions* pOptions); #ifdef __cplusplus } diff --git a/source/libs/parser/inc/new_sql.y b/source/libs/parser/inc/new_sql.y index 540e45459e..0ecacc432a 100644 --- a/source/libs/parser/inc/new_sql.y +++ b/source/libs/parser/inc/new_sql.y @@ -65,7 +65,7 @@ //%right NK_BITNOT. /************************************************ create database *****************************************************/ -cmd ::= CREATE DATABASE exists_opt(A) db_name(B) db_options(C). { PARSER_TRACE; pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C);} +cmd ::= CREATE DATABASE exists_opt(A) db_name(B) db_options(C). { pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C);} %type exists_opt { bool } exists_opt(A) ::= IF NOT EXISTS. { A = true; } @@ -73,7 +73,7 @@ exists_opt(A) ::= . %type db_options { SDatabaseOptions* } %destructor db_options { tfree($$); } -db_options(A) ::= . { A = createDefaultDatabaseOptions(pCxt);} +db_options(A) ::= . { A = createDefaultDatabaseOptions(pCxt); } db_options(A) ::= db_options(B) BLOCKS NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_BLOCKS, &C); } db_options(A) ::= db_options(B) CACHE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHE, &C); } db_options(A) ::= db_options(B) CACHELAST NK_INTEGER(X)(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_CACHELAST, &C); } @@ -92,6 +92,55 @@ db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C). db_options(A) ::= db_options(B) SINGLESTABLE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_SINGLESTABLE, &C); } db_options(A) ::= db_options(B) STREAMMODE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_STREAMMODE, &C); } +/************************************************ create table *******************************************************/ +cmd ::= CREATE TABLE exists_opt(A) full_table_name(B) + NK_LP column_def_list(C) NK_RP table_options(D). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, &B, C, D);} + +%type full_table_name { STokenPair } +%destructor full_table_name { } +full_table_name(A) ::= NK_ID(B). { A = { .first = B, .second = nil_token}; } +full_table_name(A) ::= NK_ID(B) NK_DOT NK_ID(C). { A = { .first = B, .second = C}; } + +%type column_def_list { SNodeList* } +%destructor column_def_list { nodesDestroyList($$); } +column_def_list(A) ::= column_def(B). { A = createNodeList(pCxt, B); } +column_def_list(A) ::= column_def_list(B) NK_COMMA column_def(C). { A = addNodeToList(pCxt, B, C); } + +column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, B, C, NULL); } +column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, B, C, &D); } + +%type type_name { SDataType } +%destructor type_name { } +type_name(A) ::= BOOL. { A = createDataType(TSDB_DATA_TYPE_BOOL); } +type_name(A) ::= TINYINT. { A = createDataType(TSDB_DATA_TYPE_TINYINT); } +type_name(A) ::= SMALLINT. { A = createDataType(TSDB_DATA_TYPE_SMALLINT); } +type_name(A) ::= INT. { A = createDataType(TSDB_DATA_TYPE_INT); } +type_name(A) ::= BIGINT. { A = createDataType(TSDB_DATA_TYPE_BIGINT); } +type_name(A) ::= FLOAT. { A = createDataType(TSDB_DATA_TYPE_FLOAT); } +type_name(A) ::= DOUBLE. { A = createDataType(TSDB_DATA_TYPE_DOUBLE); } +type_name(A) ::= BINARY NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_BINARY, &B); } +type_name(A) ::= TIMESTAMP. { A = createDataType(TSDB_DATA_TYPE_TIMESTAMP); } +type_name(A) ::= NCHAR NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_NCHAR, &B); } +type_name(A) ::= TINYINT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_UTINYINT); } +type_name(A) ::= SMALLINT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_USMALLINT); } +type_name(A) ::= INT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_UINT); } +type_name(A) ::= BIGINT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_UBIGINT); } +type_name(A) ::= JSON. { A = createDataType(TSDB_DATA_TYPE_JSON); } +type_name(A) ::= VARCHAR NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_VARCHAR, &B); } +type_name(A) ::= MEDIUMBLOB. { A = createDataType(TSDB_DATA_TYPE_MEDIUMBLOB); } +type_name(A) ::= BLOB. { A = createDataType(TSDB_DATA_TYPE_BLOB); } +type_name(A) ::= VARBINARY NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_VARBINARY, &B); } +type_name(A) ::= DECIMAL. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); } +type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); } +type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_COMMA NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); } + +%type table_options { SDatabaseOptions* } +%destructor table_options { tfree($$); } +table_options(A) ::= . { A = createDefaultTableOptions(pCxt);} +table_options(A) ::= table_options(B) COMMENT NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_COMMENT, &C); } +table_options(A) ::= table_options(B) KEEP NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_KEEP, &C); } +table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); } + //cmd ::= SHOW DATABASES. { PARSER_TRACE; createShowStmt(pCxt, SHOW_TYPE_DATABASE); } /************************************************ select *************************************************************/ diff --git a/source/libs/parser/src/astCreateFuncs.c b/source/libs/parser/src/astCreateFuncs.c index 76dc185e78..a531f3de71 100644 --- a/source/libs/parser/src/astCreateFuncs.c +++ b/source/libs/parser/src/astCreateFuncs.c @@ -35,9 +35,11 @@ SToken nil_token = { .type = TK_NIL, .n = 0, .z = NULL }; typedef SDatabaseOptions* (*FSetDatabaseOption)(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal); - static FSetDatabaseOption setDbOptionFuncs[DB_OPTION_MAX]; +typedef STableOptions* (*FSetTableOption)(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal); +static FSetTableOption setTableOptionFuncs[TABLE_OPTION_MAX]; + static SDatabaseOptions* setDbBlocks(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) { int64_t val = strtol(pVal->z, NULL, 10); if (val < TSDB_MIN_TOTAL_BLOCKS || val > TSDB_MAX_TOTAL_BLOCKS) { @@ -263,12 +265,54 @@ static void initSetDatabaseOptionFp() { setDbOptionFuncs[DB_OPTION_STREAMMODE] = setDbStreamMode; } +static STableOptions* setTableKeep(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) { + int64_t val = strtol(pVal->z, NULL, 10); + if (val < TSDB_MIN_KEEP || val > TSDB_MAX_KEEP) { + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, + "invalid table option keep: %d valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP); + pCxt->valid = false; + return pOptions; + } + pOptions->keep = val; + return pOptions; +} + +static STableOptions* setTableTtl(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) { + int64_t val = strtol(pVal->z, NULL, 10); + if (val < TSDB_MIN_DB_TTL_OPTION) { + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, + "invalid table option ttl: %d, should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION); + pCxt->valid = false; + return pOptions; + } + pOptions->ttl = val; + return pOptions; +} + +static STableOptions* setTableComment(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) { + if (pVal->n >= sizeof(pOptions->comments)) { + snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, + "invalid table option comment, length cannot exceed %d", sizeof(pOptions->comments) - 1); + pCxt->valid = false; + return pOptions; + } + strncpy(pOptions->comments, pVal->z, pVal->n); + return pOptions; +} + +static void initSetTableOptionFp() { + setTableOptionFuncs[TABLE_OPTION_KEEP] = setTableKeep; + setTableOptionFuncs[TABLE_OPTION_TTL] = setTableTtl; + setTableOptionFuncs[TABLE_OPTION_COMMENT] = setTableComment; +} + void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) { pCxt->pQueryCxt = pParseCxt; pCxt->notSupport = false; pCxt->valid = true; pCxt->pRootNode = NULL; initSetDatabaseOptionFp(); + initSetTableOptionFp(); } static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName) { @@ -651,3 +695,40 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, cons pStmt->options = *pOptions; return (SNode*)pStmt; } + +STableOptions* createDefaultTableOptions(SAstCreateContext* pCxt) { + STableOptions* pOptions = calloc(1, sizeof(STableOptions)); + CHECK_OUT_OF_MEM(pOptions); + pOptions->keep = TSDB_DEFAULT_KEEP; + pOptions->ttl = TSDB_DEFAULT_DB_TTL_OPTION; + return pOptions; +} + +STableOptions* setTableOption(SAstCreateContext* pCxt, STableOptions* pOptions, ETableOptionType type, const SToken* pVal) { + return setTableOptionFuncs[type](pCxt, pOptions, pVal); +} + +SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDataType dataType, const SToken* pComment) { + SColumnDefNode* pCol = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF); + CHECK_OUT_OF_MEM(pCol); + strncpy(pCol->colName, pColName->z, pColName->n); + pCol->dataType = dataType; + if (NULL != pComment) { + strncpy(pCol->colName, pColName->z, pColName->n); + } + return (SNode*)pCol; +} + +SNode* createCreateTableStmt(SAstCreateContext* pCxt, + bool ignoreExists, const STokenPair* pFullTableName, SNodeList* pCols, STableOptions* pOptions) { + SCreateTableStmt* pStmt = (SCreateTableStmt*)nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT); + CHECK_OUT_OF_MEM(pStmt); + if (TK_NIL != pFullTableName->first.type) { + strncpy(pStmt->dbName, pFullTableName->first.z, pFullTableName->first.n); + } + strncpy(pStmt->tableName, pFullTableName->second.z, pFullTableName->second.n); + pStmt->ignoreExists = ignoreExists; + pStmt->pCols = pCols; + pStmt->options = *pOptions; + return (SNode*)pStmt; +} diff --git a/source/libs/parser/src/astParse.c b/source/libs/parser/src/astParse.c index a35f156600..7519b36861 100644 --- a/source/libs/parser/src/astParse.c +++ b/source/libs/parser/src/astParse.c @@ -32,6 +32,7 @@ static bool isCmd(const SNode* pRootNode) { } switch (nodeType(pRootNode)) { case QUERY_NODE_SELECT_STMT: + case QUERY_NODE_CREATE_TABLE_STMT: return false; default: break; @@ -74,7 +75,7 @@ int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery) { } default: NewParse(pParser, t0.type, t0, &cxt); - NewParseTrace(stdout, ""); + // NewParseTrace(stdout, ""); if (!cxt.valid) { goto abort_parse; } diff --git a/source/libs/parser/src/astTranslate.c b/source/libs/parser/src/astTranslate.c index ee98a83d72..24a9468d2a 100644 --- a/source/libs/parser/src/astTranslate.c +++ b/source/libs/parser/src/astTranslate.c @@ -829,6 +829,10 @@ static int32_t translateCreateDatabase(STranslateContext* pCxt, SCreateDatabaseS return TSDB_CODE_SUCCESS; } +static int32_t translateCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) { + return TSDB_CODE_SUCCESS; +} + static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pNode)) { @@ -838,6 +842,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_CREATE_DATABASE_STMT: code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode); break; + case QUERY_NODE_CREATE_TABLE_STMT: + code = translateCreateTable(pCxt, (SCreateTableStmt*)pNode); + break; default: break; } diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index ca7e09dba1..a6117c9c9b 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -41,8 +41,7 @@ extern "C" { int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode); int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode); -int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode); -int32_t buildPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan); +int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan); #ifdef __cplusplus } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index e7afd2e49c..6d2205f72b 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -497,23 +497,13 @@ void qDestroySubplan(SSubplan* pSubplan) { #include "functionMgt.h" -typedef struct SSubLogicPlan { - SNode* pRoot; // SLogicNode - bool haveSuperTable; - bool haveSystemTable; -} SSubLogicPlan; - -int32_t splitLogicPlan(SSubLogicPlan* pLogicPlan) { - // todo - return TSDB_CODE_SUCCESS; -} - typedef struct SSlotIndex { int16_t dataBlockId; int16_t slotId; } SSlotIndex; typedef struct SPhysiPlanContext { + SPlanContext* pPlanCxt; int32_t errCode; int16_t nextDataBlockId; SArray* pLocationHelper; @@ -956,19 +946,94 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl return pPhyNode; } -int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) { - SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextDataBlockId = 0, .pLocationHelper = taosArrayInit(32, POINTER_BYTES) }; +static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan) { + SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + pSubplan->pNode = createPhysiNode(pCxt, pLogicSubplan->pNode); + return pSubplan; +} + +static SQueryLogicPlan* createRawQueryLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode) { + SQueryLogicPlan* pLogicPlan = (SQueryLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN); + CHECK_ALLOC(pLogicPlan, NULL); + pLogicPlan->pSubplans = nodesMakeList(); + CHECK_ALLOC(pLogicPlan->pSubplans, pLogicPlan); + SNodeListNode* pTopSubplans = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + CHECK_ALLOC(pTopSubplans, pLogicPlan); + if (TSDB_CODE_SUCCESS != nodesListAppend(pLogicPlan->pSubplans, (SNode*)pTopSubplans)) { + nodesDestroyNode((SNode*)pTopSubplans); + return pLogicPlan; + } + pTopSubplans->pNodeList = nodesMakeList(); + CHECK_ALLOC(pTopSubplans->pNodeList, pLogicPlan); + SSubLogicPlan* pSubplan = (SSubLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); + CHECK_ALLOC(pSubplan, pLogicPlan); + if (TSDB_CODE_SUCCESS != nodesListAppend(pTopSubplans->pNodeList, (SNode*)pSubplan)) { + nodesDestroyNode((SNode*)pSubplan); + return pLogicPlan; + } + pSubplan->pNode = (SLogicNode*)nodesCloneNode((SNode*)pLogicNode); + CHECK_ALLOC(pSubplan->pNode, pLogicPlan); + return pLogicPlan; +} + +static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SQueryLogicPlan** pLogicPlan) { + SQueryLogicPlan* pPlan = createRawQueryLogicPlan(pCxt, pLogicNode); + if (TSDB_CODE_SUCCESS != pCxt->errCode) { + nodesDestroyNode((SNode*)pPlan); + return pCxt->errCode; + } + // todo split + *pLogicPlan = pPlan; + return TSDB_CODE_SUCCESS; +} + +static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) { + SQueryPlan* pQueryPlan = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); + CHECK_ALLOC(pQueryPlan, TSDB_CODE_OUT_OF_MEMORY); + *pPlan = pQueryPlan; + pQueryPlan->queryId = pCxt->pPlanCxt->queryId; + + pQueryPlan->pSubplans = nodesMakeList(); + CHECK_ALLOC(pQueryPlan->pSubplans, TSDB_CODE_OUT_OF_MEMORY); + SNode* pNode; + FOREACH(pNode, pLogicPlan->pSubplans) { + SNodeListNode* pLevelSubplans = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + CHECK_ALLOC(pLevelSubplans, TSDB_CODE_OUT_OF_MEMORY); + if (TSDB_CODE_SUCCESS != nodesListAppend(pQueryPlan->pSubplans, (SNode*)pLevelSubplans)) { + nodesDestroyNode((SNode*)pLevelSubplans); + return TSDB_CODE_OUT_OF_MEMORY; + } + SNode* pLogicSubplan; + FOREACH(pLogicSubplan, ((SNodeListNode*)pNode)->pNodeList) { + SSubplan* pSubplan = createPhysiSubplan(pCxt, (SSubLogicPlan*)pLogicSubplan); + CHECK_ALLOC(pSubplan, TSDB_CODE_OUT_OF_MEMORY); + if (TSDB_CODE_SUCCESS != nodesListAppend(pLevelSubplans->pNodeList, (SNode*)pSubplan)) { + nodesDestroyNode((SNode*)pSubplan); + return TSDB_CODE_OUT_OF_MEMORY; + } + ++(pQueryPlan->numOfSubplans); + } + } + return TSDB_CODE_SUCCESS; +} + +int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan) { + SPhysiPlanContext cxt = { + .pPlanCxt = pCxt, + .errCode = TSDB_CODE_SUCCESS, + .nextDataBlockId = 0, + .pLocationHelper = taosArrayInit(32, POINTER_BYTES) + }; if (NULL == cxt.pLocationHelper) { return TSDB_CODE_OUT_OF_MEMORY; } - *pPhyNode = createPhysiNode(&cxt, pLogicNode); - return cxt.errCode; -} - -int32_t buildPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan) { - // split - // scale out - // maping - // create - return TSDB_CODE_SUCCESS; + SQueryLogicPlan* pLogicPlan; + int32_t code = splitLogicPlan(&cxt, pLogicNode, &pLogicPlan); + // todo scale out + // todo maping + if (TSDB_CODE_SUCCESS == code) { + code = buildPhysiPlan(&cxt, pLogicPlan, pPlan); + } + nodesDestroyNode((SNode*)pLogicPlan); + return code; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index d58f8bce15..6fc0b49fbd 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -28,7 +28,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan) { code = optimize(pCxt, pLogicNode); } if (TSDB_CODE_SUCCESS == code) { - code = buildPhysiPlan(pCxt, pLogicNode, pPlan); + code = createPhysiPlan(pCxt, pLogicNode, pPlan); } return code; } @@ -38,11 +38,11 @@ void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstrea } int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len) { - + return nodesNodeToString((const SNode*)subplan, false, str, len); } int32_t qStringToSubplan(const char* str, SSubplan** subplan) { - + return nodesStringToNode(str, (SNode**)subplan); } char* qQueryPlanToString(const SQueryPlan* pPlan) { diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 877d984ada..950d3d2827 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -70,14 +70,14 @@ protected: cout << toString((const SNode*)pLogicPlan, false) << endl; if (TEST_PHYSICAL_PLAN == target) { - SPhysiNode* pPhyPlan = nullptr; - code = createPhysiPlan(pLogicPlan, &pPhyPlan); + SQueryPlan* pPlan = nullptr; + code = createPhysiPlan(&cxt, pLogicPlan, &pPlan); if (code != TSDB_CODE_SUCCESS) { cout << "sql:[" << cxt_.pSql << "] physical plan code:" << code << ", strerror:" << tstrerror(code) << endl; return false; } cout << "unformatted physical plan : " << endl; - cout << toString((const SNode*)pPhyPlan, false) << endl; + cout << toString((const SNode*)pPlan, false) << endl; } return true; diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 42270cd645..236c427138 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -111,7 +111,7 @@ typedef struct SSchJob { void *transport; SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr SArray *levels; // Element is SQueryLevel, starting from 0. SArray - SArray *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler + SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler int32_t levelIdx; SEpSet dataSrcEps; @@ -135,9 +135,9 @@ typedef struct SSchJob { #define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) -#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) -#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) -#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->type == QUERY_TYPE_MODIFY) +#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) +#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY) +#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) @@ -145,7 +145,7 @@ typedef struct SSchJob { #define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st) #define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) -#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY) +#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != SUBPLAN_TYPE_MODIFY) #define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) #define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 03f81218ae..b4761a749f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -203,8 +203,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t m = 0; m < pLevel->taskNum; ++m) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); SSubplan *pPlan = pTask->plan; - int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0; - int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0; + int32_t childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0; + int32_t parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0; if (childNum > 0) { if (pJob->levelIdx == pLevel->level) { @@ -220,8 +220,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } for (int32_t n = 0; n < childNum; ++n) { - SSubplan **child = taosArrayGet(pPlan->pChildren, n); - SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); + SSubplan *child = (SSubplan*)nodesListGetNode(pPlan->pChildren, n); + SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES); if (NULL == childTask || NULL == *childTask) { SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); @@ -252,8 +252,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } for (int32_t n = 0; n < parentNum; ++n) { - SSubplan **parent = taosArrayGet(pPlan->pParents, n); - SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES); + SSubplan *parent = (SSubplan*)nodesListGetNode(pPlan->pParents, n); + SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); if (NULL == parentTask || NULL == *parentTask) { SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); @@ -312,7 +312,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t levelNum = (int32_t)taosArrayGetSize(pDag->pSubplans); + int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans); if (levelNum <= 0) { SCH_JOB_ELOG("invalid level num:%d", levelNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -336,7 +336,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { pJob->subPlans = pDag->pSubplans; SSchLevel level = {0}; - SArray *plans = NULL; + SNodeListNode *plans = NULL; int32_t taskNum = 0; SSchLevel *pLevel = NULL; @@ -351,13 +351,13 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { pLevel = taosArrayGet(pJob->levels, i); pLevel->level = i; - plans = taosArrayGetP(pDag->pSubplans, i); + plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, i); if (NULL == plans) { SCH_JOB_ELOG("empty level plan, level:%d", i); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - taskNum = (int32_t)taosArrayGetSize(plans); + taskNum = (int32_t)LIST_LENGTH(plans->pNodeList); if (taskNum <= 0) { SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); @@ -372,9 +372,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { } for (int32_t n = 0; n < taskNum; ++n) { - SSubplan *plan = taosArrayGetP(plans, n); + SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, n); - SCH_SET_JOB_TYPE(&pJob->attr, plan->type); + SCH_SET_JOB_TYPE(&pJob->attr, plan->subplanType); SSchTask task = {0}; SSchTask *pTask = &task; @@ -1420,18 +1420,18 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD } int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) { - if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) == 0) { + if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t levelNum = taosArrayGetSize(pDag->pSubplans); + int32_t levelNum = LIST_LENGTH(pDag->pSubplans); if (1 != levelNum) { qError("invalid level num: %d", levelNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SArray *plans = taosArrayGet(pDag->pSubplans, 0); - int32_t taskNum = taosArrayGetSize(plans); + SNodeListNode *plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, 0); + int32_t taskNum = LIST_LENGTH(plans->pNodeList); if (taskNum <= 0) { qError("invalid task num: %d", taskNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1449,7 +1449,7 @@ int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) { int32_t code = 0; for (int32_t i = 0; i < taskNum; ++i) { - SSubplan *plan = taosArrayGetP(plans, i); + SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, i); tInfo.addr = plan->execNode; code = qSubPlanToString(plan, &msg, &msgLen); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index d05dd731a8..2007b6ca18 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -78,9 +78,9 @@ void schtBuildQueryDag(SQueryPlan *dag) { dag->queryId = qId; dag->numOfSubplans = 2; - dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); - SArray *scan = taosArrayInit(1, POINTER_BYTES); - SArray *merge = taosArrayInit(1, POINTER_BYTES); + dag->pSubplans = nodesMakeList(); + SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); + SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan)); SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan)); @@ -88,7 +88,7 @@ void schtBuildQueryDag(SQueryPlan *dag) { scanPlan->id.queryId = qId; scanPlan->id.templateId = 0x0000000000000002; scanPlan->id.subplanId = 0x0000000000000003; - scanPlan->type = QUERY_TYPE_SCAN; + scanPlan->subplanType = SUBPLAN_TYPE_SCAN; scanPlan->execNode.nodeId = 1; scanPlan->execNode.epset.inUse = 0; @@ -96,30 +96,30 @@ void schtBuildQueryDag(SQueryPlan *dag) { scanPlan->pChildren = NULL; scanPlan->level = 1; - scanPlan->pParents = taosArrayInit(1, POINTER_BYTES); + scanPlan->pParents = nodesMakeList(); scanPlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); scanPlan->msgType = TDMT_VND_QUERY; mergePlan->id.queryId = qId; mergePlan->id.templateId = schtMergeTemplateId; - mergePlan->id.subplanId = 0x5555555555; - mergePlan->type = QUERY_TYPE_MERGE; + mergePlan->id.subplanId = 0x5555; + mergePlan->subplanType = SUBPLAN_TYPE_MERGE; mergePlan->level = 0; mergePlan->execNode.epset.numOfEps = 0; - mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES); + mergePlan->pChildren = nodesMakeList(); mergePlan->pParents = NULL; mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); mergePlan->msgType = TDMT_VND_QUERY; - SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); - SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); + nodesListAppend(merge->pNodeList, (SNode*)mergePlan); + nodesListAppend(scan->pNodeList, (SNode*)scanPlan); - taosArrayPush(mergePlan->pChildren, &scanPlan); - taosArrayPush(scanPlan->pParents, &mergePlan); + nodesListAppend(mergePlan->pChildren, (SNode*)scanPlan); + nodesListAppend(scanPlan->pParents, (SNode*)mergePlan); - taosArrayPush(dag->pSubplans, &merge); - taosArrayPush(dag->pSubplans, &scan); + nodesListAppend(dag->pSubplans, (SNode*)merge); + nodesListAppend(dag->pSubplans, (SNode*)scan); } void schtFreeQueryDag(SQueryPlan *dag) { @@ -132,15 +132,15 @@ void schtBuildInsertDag(SQueryPlan *dag) { dag->queryId = qId; dag->numOfSubplans = 2; - dag->pSubplans = taosArrayInit(1, POINTER_BYTES); - SArray *inserta = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); + dag->pSubplans = nodesMakeList(); + SNodeListNode *inserta = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST); SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan)); insertPlan[0].id.queryId = qId; insertPlan[0].id.templateId = 0x0000000000000003; insertPlan[0].id.subplanId = 0x0000000000000004; - insertPlan[0].type = QUERY_TYPE_MODIFY; + insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY; insertPlan[0].level = 0; insertPlan[0].execNode.nodeId = 1; @@ -156,7 +156,7 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan[1].id.queryId = qId; insertPlan[1].id.templateId = 0x0000000000000003; insertPlan[1].id.subplanId = 0x0000000000000005; - insertPlan[1].type = QUERY_TYPE_MODIFY; + insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY; insertPlan[1].level = 0; insertPlan[1].execNode.nodeId = 1; @@ -169,11 +169,11 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode)); insertPlan[1].msgType = TDMT_VND_SUBMIT; - taosArrayPush(inserta, &insertPlan); + nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); insertPlan += 1; - taosArrayPush(inserta, &insertPlan); + nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); - taosArrayPush(dag->pSubplans, &inserta); + nodesListAppend(dag->pSubplans, (SNode*)inserta); } @@ -347,7 +347,7 @@ void* schtRunJobThread(void *aa) { char *dbname = "1.db1"; char *tablename = "table1"; SVgroupInfo vgInfo = {0}; - SQueryPlan dag = {0}; + SQueryPlan dag; schtInitLogFile(); @@ -517,7 +517,7 @@ TEST(queryTest, normalCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; SSchJob *pJob = NULL; - SQueryPlan dag = {0}; + SQueryPlan dag; schtInitLogFile(); @@ -620,7 +620,7 @@ TEST(insertTest, normalCase) { char *dbname = "1.db1"; char *tablename = "table1"; SVgroupInfo vgInfo = {0}; - SQueryPlan dag = {0}; + SQueryPlan dag; uint64_t numOfRows = 0; schtInitLogFile(); diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 4b68467450..313b879abe 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -73,6 +73,22 @@ int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) { return tjsonAddItemToArray(pJson, pJobj); } +int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize, int32_t num) { + if (num > 0) { + SJson* pJsonArray = tjsonAddArrayToObject(pJson, pName); + if (NULL == pJsonArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for (size_t i = 0; i < num; ++i) { + int32_t code = tjsonAddItem(pJsonArray, func, (const char*)pArray + itemSize * i); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + } + return TSDB_CODE_SUCCESS; +} + char* tjsonToString(const SJson* pJson) { return cJSON_Print((cJSON*)pJson); } char* tjsonToUnformattedString(const SJson* pJson) { return cJSON_PrintUnformatted((cJSON*)pJson); } @@ -175,4 +191,16 @@ int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, voi return func(pJsonObj, pObj); } +int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize) { + const cJSON* jArray = tjsonGetObjectItem(pJson, pName); + int32_t size = (NULL == jArray ? 0 : tjsonGetArraySize(jArray)); + for (int32_t i = 0; i < size; ++i) { + int32_t code = func(tjsonGetArrayItem(jArray, i), (char*)pArray + itemSize * i); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + return TSDB_CODE_SUCCESS; +} + SJson* tjsonParse(const char* pStr) { return cJSON_Parse(pStr); }