Merge pull request #10503 from taosdata/feature/3.0_query_integrate_wxy

TD-13747 planner integrate
This commit is contained in:
xiao-yu-wang 2022-03-02 16:53:58 +08:00 committed by GitHub
commit bfe06b0249
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 582 additions and 121 deletions

View File

@ -20,7 +20,7 @@
extern "C" {
#endif
#include "nodes.h"
#include "querynodes.h"
typedef struct SDatabaseOptions {
int32_t numOfBlocks;
@ -49,6 +49,30 @@ typedef struct SCreateDatabaseStmt {
SDatabaseOptions options;
} SCreateDatabaseStmt;
typedef struct STableOptions {
int32_t keep;
int32_t ttl;
char comments[TSDB_STB_COMMENT_LEN];
} STableOptions;
typedef struct SColumnDefNode {
ENodeType type;
char colName[TSDB_COL_NAME_LEN];
SDataType dataType;
char comments[TSDB_STB_COMMENT_LEN];
} SColumnDefNode;
typedef struct SCreateTableStmt {
ENodeType type;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
SNodeList* pCols;
STableOptions options;
} SCreateTableStmt;
// CREATE TABLE [IF NOT EXISTS] [db_name.]tb_name (create_definition [, create_definitionn] ...) [table_options]
#ifdef __cplusplus
}
#endif

View File

@ -65,6 +65,7 @@ typedef enum ENodeType {
QUERY_NODE_TARGET,
QUERY_NODE_DATABLOCK_DESC,
QUERY_NODE_SLOT_DESC,
QUERY_NODE_COLUMN_DEF,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR,
@ -72,12 +73,15 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_STMT,
QUERY_NODE_VNODE_MODIF_STMT,
QUERY_NODE_CREATE_DATABASE_STMT,
QUERY_NODE_CREATE_TABLE_STMT,
// logic plan node
QUERY_NODE_LOGIC_PLAN_SCAN,
QUERY_NODE_LOGIC_PLAN_JOIN,
QUERY_NODE_LOGIC_PLAN_AGG,
QUERY_NODE_LOGIC_PLAN_PROJECT,
QUERY_NODE_LOGIC_SUBPLAN,
QUERY_NODE_LOGIC_PLAN,
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN,
@ -89,8 +93,9 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_AGG,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE,
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_DSINK_DISPATCH
QUERY_NODE_PHYSICAL_PLAN_DISPATCH,
QUERY_NODE_PHYSICAL_SUBPLAN,
QUERY_NODE_PHYSICAL_PLAN
} ENodeType;
/**

View File

@ -65,6 +65,18 @@ typedef struct SProjectLogicNode {
SNodeList* pProjections;
} SProjectLogicNode;
typedef struct SSubLogicPlan {
ENodeType type;
SNodeList* pChildren;
SNodeList* pParents;
SLogicNode* pNode;
} SSubLogicPlan;
typedef struct SQueryLogicPlan {
ENodeType type;;
SNodeList* pSubplans;
} SQueryLogicPlan;
typedef struct SSlotDescNode {
ENodeType type;
int16_t slotId;
@ -98,6 +110,7 @@ typedef struct SScanPhysiNode {
int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t count; // repeat count
int32_t reverse; // reverse scan count
char tableName[TSDB_TABLE_NAME_LEN];
} SScanPhysiNode;
typedef SScanPhysiNode SSystemTableScanPhysiNode;
@ -159,6 +172,39 @@ typedef struct SDataInserterNode {
char *pData;
} SDataInserterNode;
typedef struct SSubplanId {
uint64_t queryId;
int32_t templateId;
int32_t subplanId;
} SSubplanId;
typedef enum ESubplanType {
SUBPLAN_TYPE_MERGE = 1,
SUBPLAN_TYPE_PARTIAL,
SUBPLAN_TYPE_SCAN,
SUBPLAN_TYPE_MODIFY
} ESubplanType;
typedef struct SSubplan {
ENodeType type;
SSubplanId id; // unique id of the subplan
ESubplanType subplanType;
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SNodeList* pChildren; // the datasource subplan,from which to fetch the result
SNodeList* pParents; // the data destination subplan, get data from current subplan
SPhysiNode* pNode; // physical plan of current subplan
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
} SSubplan;
typedef struct SQueryPlan {
ENodeType type;;
uint64_t queryId;
int32_t numOfSubplans;
SNodeList* pSubplans; // SNodeListNode. The execution level of subplan, starting from 0.
} SQueryPlan;
#ifdef __cplusplus
}
#endif

View File

@ -22,35 +22,6 @@ extern "C" {
#include "plannodes.h"
#define QUERY_TYPE_MERGE 1
#define QUERY_TYPE_PARTIAL 2
#define QUERY_TYPE_SCAN 3
#define QUERY_TYPE_MODIFY 4
typedef struct SSubplanId {
uint64_t queryId;
uint64_t templateId;
uint64_t subplanId;
} SSubplanId;
typedef struct SSubplan {
SSubplanId id; // unique id of the subplan
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SArray* pChildren; // the datasource subplan,from which to fetch the result
SArray* pParents; // the data destination subplan, get data from current subplan
SPhysiNode* pNode; // physical plan of current subplan
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
} SSubplan;
typedef struct SQueryPlan {
uint64_t queryId;
int32_t numOfSubplans;
SArray* pSubplans; // SArray*<SArray*<SSubplan*>>. The execution level of subplan, starting from 0.
} SQueryPlan;
typedef struct SPlanContext {
uint64_t queryId;
SNode* pAstRoot;

View File

@ -54,10 +54,12 @@ typedef int32_t (*FToJson)(const void* pObj, SJson* pJson);
int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj);
int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj);
int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize, int32_t num);
typedef int32_t (*FToObject)(const SJson* pJson, void* pObj);
int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj);
int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize);
char* tjsonToString(const SJson* pJson);
char* tjsonToUnformattedString(const SJson* pJson);

View File

@ -762,8 +762,8 @@ static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SM
SVgObj *pVgroup = NULL;
SQueryPlan *pPlan = qStringToQueryPlan(pTopic->physicalPlan);
SArray *pArray = NULL;
SArray *inner = taosArrayGet(pPlan->pSubplans, 0);
SSubplan *plan = taosArrayGetP(inner, 0);
SNodeListNode *inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
SSubplan *plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
SArray *unassignedVg = pSub->unassignedVg;
void *pIter = NULL;

View File

@ -26,7 +26,7 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg) {
}
int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHandle) {
if (QUERY_NODE_DSINK_DISPATCH == nodeType(pDataSink)) {
if (QUERY_NODE_PHYSICAL_PLAN_DISPATCH == nodeType(pDataSink)) {
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
}
return TSDB_CODE_FAILED;

View File

@ -90,6 +90,10 @@ static char* nodeName(ENodeType type) {
return "PhysiJoin";
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return "PhysiAgg";
case QUERY_NODE_PHYSICAL_SUBPLAN:
return "PhysiSubplan";
case QUERY_NODE_PHYSICAL_PLAN:
return "PhysiPlan";
default:
break;
}
@ -462,6 +466,164 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkSubplanIdQueryId = "QueryId";
static const char* jkSubplanIdTemplateId = "TemplateId";
static const char* jkSubplanIdSubplanId = "SubplanId";
static int32_t subplanIdToJson(const void* pObj, SJson* pJson) {
const SSubplanId* pNode = (const SSubplanId*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkSubplanIdQueryId, pNode->queryId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanIdTemplateId, pNode->templateId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanIdSubplanId, pNode->subplanId);
}
return code;
}
static int32_t jsonToSubplanId(const SJson* pJson, void* pObj) {
SSubplanId* pNode = (SSubplanId*)pObj;
int32_t code = tjsonGetUBigIntValue(pJson, jkSubplanIdQueryId, &pNode->queryId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanIdTemplateId, &pNode->templateId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanIdSubplanId, &pNode->subplanId);
}
return code;
}
static const char* jkEndPointFqdn = "Fqdn";
static const char* jkEndPointPort = "Port";
static int32_t epToJson(const void* pObj, SJson* pJson) {
const SEp* pNode = (const SEp*)pObj;
int32_t code = tjsonAddStringToObject(pJson, jkEndPointFqdn, pNode->fqdn);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkEndPointPort, pNode->port);
}
return code;
}
static int32_t jsonToEp(const SJson* pJson, void* pObj) {
SEp* pNode = (SEp*)pObj;
int32_t code = tjsonGetStringValue(pJson, jkEndPointFqdn, pNode->fqdn);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetSmallIntValue(pJson, jkEndPointPort, &pNode->port);
}
return code;
}
static const char* jkQueryNodeAddrId = "Id";
static const char* jkQueryNodeAddrInUse = "InUse";
static const char* jkQueryNodeAddrNumOfEps = "NumOfEps";
static const char* jkQueryNodeAddrEps = "Eps";
static int32_t queryNodeAddrToJson(const void* pObj, SJson* pJson) {
const SQueryNodeAddr* pNode = (const SQueryNodeAddr*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkQueryNodeAddrId, pNode->nodeId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryNodeAddrInUse, pNode->epset.inUse);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkQueryNodeAddrNumOfEps, pNode->epset.numOfEps);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddArray(pJson, jkQueryNodeAddrEps, epToJson, pNode->epset.eps, sizeof(SEp), pNode->epset.numOfEps);
}
return code;
}
static int32_t jsonToQueryNodeAddr(const SJson* pJson, void* pObj) {
SQueryNodeAddr* pNode = (SQueryNodeAddr*)pObj;
int32_t code = tjsonGetIntValue(pJson, jkQueryNodeAddrId, &pNode->nodeId);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkQueryNodeAddrInUse, &pNode->epset.inUse);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkQueryNodeAddrNumOfEps, &pNode->epset.numOfEps);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToArray(pJson, jkQueryNodeAddrEps, jsonToEp, pNode->epset.eps, sizeof(SEp));
}
return code;
}
static const char* jkSubplanId = "Id";
static const char* jkSubplanType = "SubplanType";
static const char* jkSubplanMsgType = "MsgType";
static const char* jkSubplanLevel = "Level";
static const char* jkSubplanNodeAddr = "NodeAddr";
static const char* jkSubplanRootNode = "RootNode";
static const char* jkSubplanDataSink = "DataSink";
static int32_t subplanToJson(const void* pObj, SJson* pJson) {
const SSubplan* pNode = (const SSubplan*)pObj;
int32_t code = tjsonAddObject(pJson, jkSubplanId, subplanIdToJson, &pNode->id);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanType, pNode->subplanType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanMsgType, pNode->msgType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanLevel, pNode->level);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSubplanNodeAddr, queryNodeAddrToJson, &pNode->execNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSubplanRootNode, nodeToJson, pNode->pNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSubplanDataSink, nodeToJson, pNode->pDataSink);
}
return code;
}
static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
SSubplan* pNode = (SSubplan*)pObj;
int32_t code = tjsonToObject(pJson, jkSubplanId, jsonToSubplanId, &pNode->id);
if (TSDB_CODE_SUCCESS == code) {
int32_t val;
code = tjsonGetIntValue(pJson, jkSubplanType, &val);
pNode->subplanType = val;
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanMsgType, &pNode->msgType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanLevel, &pNode->level);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkSubplanNodeAddr, jsonToQueryNodeAddr, &pNode->execNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSubplanRootNode, (SNode**)&pNode->pNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSubplanDataSink, (SNode**)&pNode->pDataSink);
}
return code;
}
static const char* jkAggLogicPlanGroupKeys = "GroupKeys";
static const char* jkAggLogicPlanAggFuncs = "AggFuncs";
@ -1064,6 +1226,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return physiJoinNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return physiAggNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_SUBPLAN:
return subplanToJson(pObj, pJson);
default:
break;
}
@ -1127,13 +1291,15 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToPhysiJoinNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return jsonToPhysiAggNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_SUBPLAN:
return jsonToSubplan(pJson, pObj);
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static const char* jkNodeType = "Type";
static const char* jkNodeType = "NodeType";
static const char* jkNodeName = "Name";
static int32_t nodeToJson(const void* pObj, SJson* pJson) {

View File

@ -95,6 +95,10 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SJoinPhysiNode));
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return makeNode(type, sizeof(SAggPhysiNode));
case QUERY_NODE_PHYSICAL_SUBPLAN:
return makeNode(type, sizeof(SSubplan));
case QUERY_NODE_PHYSICAL_PLAN:
return makeNode(type, sizeof(SQueryPlan));
default:
break;
}

View File

@ -32,6 +32,11 @@ typedef struct SAstCreateContext {
SNode* pRootNode;
} SAstCreateContext;
typedef struct STokenPair {
SToken first;
SToken second;
} STokenPair;
extern SToken nil_token;
void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt);
@ -76,8 +81,6 @@ SNode* addLimitClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pLimit);
SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pProjectionList, SNode* pTable);
SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight);
SDatabaseOptions* createDefaultDatabaseOptions(SAstCreateContext* pCxt);
typedef enum EDatabaseOptionType {
DB_OPTION_BLOCKS = 0,
DB_OPTION_CACHE,
@ -99,11 +102,21 @@ typedef enum EDatabaseOptionType {
DB_OPTION_MAX
} EDatabaseOptionType;
SDatabaseOptions* createDefaultDatabaseOptions(SAstCreateContext* pCxt);
SDatabaseOptions* setDatabaseOption(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, EDatabaseOptionType type, const SToken* pVal);
SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pDbName, SDatabaseOptions* pOptions);
typedef enum ETableOptionType {
TABLE_OPTION_KEEP = 0,
TABLE_OPTION_TTL,
TABLE_OPTION_COMMENT,
TABLE_OPTION_MAX
} ETableOptionType;
STableOptions* createDefaultTableOptions(SAstCreateContext* pCxt);
STableOptions* setTableOption(SAstCreateContext* pCxt, STableOptions* pOptions, ETableOptionType type, const SToken* pVal);
SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDataType dataType, const SToken* pComment);
SNode* createCreateTableStmt(SAstCreateContext* pCxt, bool ignoreExists, const STokenPair* pFullTableName, SNodeList* pCols, STableOptions* pOptions);
#ifdef __cplusplus
}

View File

@ -65,7 +65,7 @@
//%right NK_BITNOT.
/************************************************ create database *****************************************************/
cmd ::= CREATE DATABASE exists_opt(A) db_name(B) db_options(C). { PARSER_TRACE; pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C);}
cmd ::= CREATE DATABASE exists_opt(A) db_name(B) db_options(C). { pCxt->pRootNode = createCreateDatabaseStmt(pCxt, A, &B, C);}
%type exists_opt { bool }
exists_opt(A) ::= IF NOT EXISTS. { A = true; }
@ -92,6 +92,55 @@ db_options(A) ::= db_options(B) VGROUPS NK_INTEGER(C).
db_options(A) ::= db_options(B) SINGLESTABLE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_SINGLESTABLE, &C); }
db_options(A) ::= db_options(B) STREAMMODE NK_INTEGER(C). { A = setDatabaseOption(pCxt, B, DB_OPTION_STREAMMODE, &C); }
/************************************************ create table *******************************************************/
cmd ::= CREATE TABLE exists_opt(A) full_table_name(B)
NK_LP column_def_list(C) NK_RP table_options(D). { pCxt->pRootNode = createCreateTableStmt(pCxt, A, &B, C, D);}
%type full_table_name { STokenPair }
%destructor full_table_name { }
full_table_name(A) ::= NK_ID(B). { A = { .first = B, .second = nil_token}; }
full_table_name(A) ::= NK_ID(B) NK_DOT NK_ID(C). { A = { .first = B, .second = C}; }
%type column_def_list { SNodeList* }
%destructor column_def_list { nodesDestroyList($$); }
column_def_list(A) ::= column_def(B). { A = createNodeList(pCxt, B); }
column_def_list(A) ::= column_def_list(B) NK_COMMA column_def(C). { A = addNodeToList(pCxt, B, C); }
column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, B, C, NULL); }
column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, B, C, &D); }
%type type_name { SDataType }
%destructor type_name { }
type_name(A) ::= BOOL. { A = createDataType(TSDB_DATA_TYPE_BOOL); }
type_name(A) ::= TINYINT. { A = createDataType(TSDB_DATA_TYPE_TINYINT); }
type_name(A) ::= SMALLINT. { A = createDataType(TSDB_DATA_TYPE_SMALLINT); }
type_name(A) ::= INT. { A = createDataType(TSDB_DATA_TYPE_INT); }
type_name(A) ::= BIGINT. { A = createDataType(TSDB_DATA_TYPE_BIGINT); }
type_name(A) ::= FLOAT. { A = createDataType(TSDB_DATA_TYPE_FLOAT); }
type_name(A) ::= DOUBLE. { A = createDataType(TSDB_DATA_TYPE_DOUBLE); }
type_name(A) ::= BINARY NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_BINARY, &B); }
type_name(A) ::= TIMESTAMP. { A = createDataType(TSDB_DATA_TYPE_TIMESTAMP); }
type_name(A) ::= NCHAR NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_NCHAR, &B); }
type_name(A) ::= TINYINT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_UTINYINT); }
type_name(A) ::= SMALLINT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_USMALLINT); }
type_name(A) ::= INT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_UINT); }
type_name(A) ::= BIGINT UNSIGNED. { A = createDataType(TSDB_DATA_TYPE_UBIGINT); }
type_name(A) ::= JSON. { A = createDataType(TSDB_DATA_TYPE_JSON); }
type_name(A) ::= VARCHAR NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_VARCHAR, &B); }
type_name(A) ::= MEDIUMBLOB. { A = createDataType(TSDB_DATA_TYPE_MEDIUMBLOB); }
type_name(A) ::= BLOB. { A = createDataType(TSDB_DATA_TYPE_BLOB); }
type_name(A) ::= VARBINARY NK_LP NK_INTEGER(B) NK_RP. { A = createVarLenDataType(TSDB_DATA_TYPE_VARBINARY, &B); }
type_name(A) ::= DECIMAL. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
type_name(A) ::= DECIMAL NK_LP NK_INTEGER NK_COMMA NK_INTEGER NK_RP. { A = createDataType(TSDB_DATA_TYPE_DECIMAL); }
%type table_options { SDatabaseOptions* }
%destructor table_options { tfree($$); }
table_options(A) ::= . { A = createDefaultTableOptions(pCxt);}
table_options(A) ::= table_options(B) COMMENT NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_COMMENT, &C); }
table_options(A) ::= table_options(B) KEEP NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_KEEP, &C); }
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
//cmd ::= SHOW DATABASES. { PARSER_TRACE; createShowStmt(pCxt, SHOW_TYPE_DATABASE); }
/************************************************ select *************************************************************/

View File

@ -35,9 +35,11 @@
SToken nil_token = { .type = TK_NIL, .n = 0, .z = NULL };
typedef SDatabaseOptions* (*FSetDatabaseOption)(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal);
static FSetDatabaseOption setDbOptionFuncs[DB_OPTION_MAX];
typedef STableOptions* (*FSetTableOption)(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal);
static FSetTableOption setTableOptionFuncs[TABLE_OPTION_MAX];
static SDatabaseOptions* setDbBlocks(SAstCreateContext* pCxt, SDatabaseOptions* pOptions, const SToken* pVal) {
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_TOTAL_BLOCKS || val > TSDB_MAX_TOTAL_BLOCKS) {
@ -263,12 +265,54 @@ static void initSetDatabaseOptionFp() {
setDbOptionFuncs[DB_OPTION_STREAMMODE] = setDbStreamMode;
}
static STableOptions* setTableKeep(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) {
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_KEEP || val > TSDB_MAX_KEEP) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid table option keep: %d valid range: [%d, %d]", val, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
pCxt->valid = false;
return pOptions;
}
pOptions->keep = val;
return pOptions;
}
static STableOptions* setTableTtl(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) {
int64_t val = strtol(pVal->z, NULL, 10);
if (val < TSDB_MIN_DB_TTL_OPTION) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid table option ttl: %d, should be greater than or equal to %d", val, TSDB_MIN_DB_TTL_OPTION);
pCxt->valid = false;
return pOptions;
}
pOptions->ttl = val;
return pOptions;
}
static STableOptions* setTableComment(SAstCreateContext* pCxt, STableOptions* pOptions, const SToken* pVal) {
if (pVal->n >= sizeof(pOptions->comments)) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen,
"invalid table option comment, length cannot exceed %d", sizeof(pOptions->comments) - 1);
pCxt->valid = false;
return pOptions;
}
strncpy(pOptions->comments, pVal->z, pVal->n);
return pOptions;
}
static void initSetTableOptionFp() {
setTableOptionFuncs[TABLE_OPTION_KEEP] = setTableKeep;
setTableOptionFuncs[TABLE_OPTION_TTL] = setTableTtl;
setTableOptionFuncs[TABLE_OPTION_COMMENT] = setTableComment;
}
void initAstCreateContext(SParseContext* pParseCxt, SAstCreateContext* pCxt) {
pCxt->pQueryCxt = pParseCxt;
pCxt->notSupport = false;
pCxt->valid = true;
pCxt->pRootNode = NULL;
initSetDatabaseOptionFp();
initSetTableOptionFp();
}
static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName) {
@ -651,3 +695,40 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, cons
pStmt->options = *pOptions;
return (SNode*)pStmt;
}
STableOptions* createDefaultTableOptions(SAstCreateContext* pCxt) {
STableOptions* pOptions = calloc(1, sizeof(STableOptions));
CHECK_OUT_OF_MEM(pOptions);
pOptions->keep = TSDB_DEFAULT_KEEP;
pOptions->ttl = TSDB_DEFAULT_DB_TTL_OPTION;
return pOptions;
}
STableOptions* setTableOption(SAstCreateContext* pCxt, STableOptions* pOptions, ETableOptionType type, const SToken* pVal) {
return setTableOptionFuncs[type](pCxt, pOptions, pVal);
}
SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDataType dataType, const SToken* pComment) {
SColumnDefNode* pCol = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF);
CHECK_OUT_OF_MEM(pCol);
strncpy(pCol->colName, pColName->z, pColName->n);
pCol->dataType = dataType;
if (NULL != pComment) {
strncpy(pCol->colName, pColName->z, pColName->n);
}
return (SNode*)pCol;
}
SNode* createCreateTableStmt(SAstCreateContext* pCxt,
bool ignoreExists, const STokenPair* pFullTableName, SNodeList* pCols, STableOptions* pOptions) {
SCreateTableStmt* pStmt = (SCreateTableStmt*)nodesMakeNode(QUERY_NODE_CREATE_TABLE_STMT);
CHECK_OUT_OF_MEM(pStmt);
if (TK_NIL != pFullTableName->first.type) {
strncpy(pStmt->dbName, pFullTableName->first.z, pFullTableName->first.n);
}
strncpy(pStmt->tableName, pFullTableName->second.z, pFullTableName->second.n);
pStmt->ignoreExists = ignoreExists;
pStmt->pCols = pCols;
pStmt->options = *pOptions;
return (SNode*)pStmt;
}

View File

@ -32,6 +32,7 @@ static bool isCmd(const SNode* pRootNode) {
}
switch (nodeType(pRootNode)) {
case QUERY_NODE_SELECT_STMT:
case QUERY_NODE_CREATE_TABLE_STMT:
return false;
default:
break;
@ -74,7 +75,7 @@ int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery) {
}
default:
NewParse(pParser, t0.type, t0, &cxt);
NewParseTrace(stdout, "");
// NewParseTrace(stdout, "");
if (!cxt.valid) {
goto abort_parse;
}

View File

@ -829,6 +829,10 @@ static int32_t translateCreateDatabase(STranslateContext* pCxt, SCreateDatabaseS
return TSDB_CODE_SUCCESS;
}
static int32_t translateCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pNode)) {
@ -838,6 +842,9 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
case QUERY_NODE_CREATE_DATABASE_STMT:
code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode);
break;
case QUERY_NODE_CREATE_TABLE_STMT:
code = translateCreateTable(pCxt, (SCreateTableStmt*)pNode);
break;
default:
break;
}

View File

@ -41,8 +41,7 @@ extern "C" {
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode);
int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode);
int32_t buildPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan);
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan);
#ifdef __cplusplus
}

View File

@ -497,23 +497,13 @@ void qDestroySubplan(SSubplan* pSubplan) {
#include "functionMgt.h"
typedef struct SSubLogicPlan {
SNode* pRoot; // SLogicNode
bool haveSuperTable;
bool haveSystemTable;
} SSubLogicPlan;
int32_t splitLogicPlan(SSubLogicPlan* pLogicPlan) {
// todo
return TSDB_CODE_SUCCESS;
}
typedef struct SSlotIndex {
int16_t dataBlockId;
int16_t slotId;
} SSlotIndex;
typedef struct SPhysiPlanContext {
SPlanContext* pPlanCxt;
int32_t errCode;
int16_t nextDataBlockId;
SArray* pLocationHelper;
@ -956,19 +946,94 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
return pPhyNode;
}
int32_t createPhysiPlan(SLogicNode* pLogicNode, SPhysiNode** pPhyNode) {
SPhysiPlanContext cxt = { .errCode = TSDB_CODE_SUCCESS, .nextDataBlockId = 0, .pLocationHelper = taosArrayInit(32, POINTER_BYTES) };
static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan) {
SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN);
pSubplan->pNode = createPhysiNode(pCxt, pLogicSubplan->pNode);
return pSubplan;
}
static SQueryLogicPlan* createRawQueryLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode) {
SQueryLogicPlan* pLogicPlan = (SQueryLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN);
CHECK_ALLOC(pLogicPlan, NULL);
pLogicPlan->pSubplans = nodesMakeList();
CHECK_ALLOC(pLogicPlan->pSubplans, pLogicPlan);
SNodeListNode* pTopSubplans = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
CHECK_ALLOC(pTopSubplans, pLogicPlan);
if (TSDB_CODE_SUCCESS != nodesListAppend(pLogicPlan->pSubplans, (SNode*)pTopSubplans)) {
nodesDestroyNode((SNode*)pTopSubplans);
return pLogicPlan;
}
pTopSubplans->pNodeList = nodesMakeList();
CHECK_ALLOC(pTopSubplans->pNodeList, pLogicPlan);
SSubLogicPlan* pSubplan = (SSubLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
CHECK_ALLOC(pSubplan, pLogicPlan);
if (TSDB_CODE_SUCCESS != nodesListAppend(pTopSubplans->pNodeList, (SNode*)pSubplan)) {
nodesDestroyNode((SNode*)pSubplan);
return pLogicPlan;
}
pSubplan->pNode = (SLogicNode*)nodesCloneNode((SNode*)pLogicNode);
CHECK_ALLOC(pSubplan->pNode, pLogicPlan);
return pLogicPlan;
}
static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SQueryLogicPlan** pLogicPlan) {
SQueryLogicPlan* pPlan = createRawQueryLogicPlan(pCxt, pLogicNode);
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
nodesDestroyNode((SNode*)pPlan);
return pCxt->errCode;
}
// todo split
*pLogicPlan = pPlan;
return TSDB_CODE_SUCCESS;
}
static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) {
SQueryPlan* pQueryPlan = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN);
CHECK_ALLOC(pQueryPlan, TSDB_CODE_OUT_OF_MEMORY);
*pPlan = pQueryPlan;
pQueryPlan->queryId = pCxt->pPlanCxt->queryId;
pQueryPlan->pSubplans = nodesMakeList();
CHECK_ALLOC(pQueryPlan->pSubplans, TSDB_CODE_OUT_OF_MEMORY);
SNode* pNode;
FOREACH(pNode, pLogicPlan->pSubplans) {
SNodeListNode* pLevelSubplans = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
CHECK_ALLOC(pLevelSubplans, TSDB_CODE_OUT_OF_MEMORY);
if (TSDB_CODE_SUCCESS != nodesListAppend(pQueryPlan->pSubplans, (SNode*)pLevelSubplans)) {
nodesDestroyNode((SNode*)pLevelSubplans);
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pLogicSubplan;
FOREACH(pLogicSubplan, ((SNodeListNode*)pNode)->pNodeList) {
SSubplan* pSubplan = createPhysiSubplan(pCxt, (SSubLogicPlan*)pLogicSubplan);
CHECK_ALLOC(pSubplan, TSDB_CODE_OUT_OF_MEMORY);
if (TSDB_CODE_SUCCESS != nodesListAppend(pLevelSubplans->pNodeList, (SNode*)pSubplan)) {
nodesDestroyNode((SNode*)pSubplan);
return TSDB_CODE_OUT_OF_MEMORY;
}
++(pQueryPlan->numOfSubplans);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan) {
SPhysiPlanContext cxt = {
.pPlanCxt = pCxt,
.errCode = TSDB_CODE_SUCCESS,
.nextDataBlockId = 0,
.pLocationHelper = taosArrayInit(32, POINTER_BYTES)
};
if (NULL == cxt.pLocationHelper) {
return TSDB_CODE_OUT_OF_MEMORY;
}
*pPhyNode = createPhysiNode(&cxt, pLogicNode);
return cxt.errCode;
SQueryLogicPlan* pLogicPlan;
int32_t code = splitLogicPlan(&cxt, pLogicNode, &pLogicPlan);
// todo scale out
// todo maping
if (TSDB_CODE_SUCCESS == code) {
code = buildPhysiPlan(&cxt, pLogicPlan, pPlan);
}
int32_t buildPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan) {
// split
// scale out
// maping
// create
return TSDB_CODE_SUCCESS;
nodesDestroyNode((SNode*)pLogicPlan);
return code;
}

View File

@ -28,7 +28,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan) {
code = optimize(pCxt, pLogicNode);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildPhysiPlan(pCxt, pLogicNode, pPlan);
code = createPhysiPlan(pCxt, pLogicNode, pPlan);
}
return code;
}
@ -38,11 +38,11 @@ void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstrea
}
int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
return nodesNodeToString((const SNode*)subplan, false, str, len);
}
int32_t qStringToSubplan(const char* str, SSubplan** subplan) {
return nodesStringToNode(str, (SNode**)subplan);
}
char* qQueryPlanToString(const SQueryPlan* pPlan) {

View File

@ -70,14 +70,14 @@ protected:
cout << toString((const SNode*)pLogicPlan, false) << endl;
if (TEST_PHYSICAL_PLAN == target) {
SPhysiNode* pPhyPlan = nullptr;
code = createPhysiPlan(pLogicPlan, &pPhyPlan);
SQueryPlan* pPlan = nullptr;
code = createPhysiPlan(&cxt, pLogicPlan, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
cout << "sql:[" << cxt_.pSql << "] physical plan code:" << code << ", strerror:" << tstrerror(code) << endl;
return false;
}
cout << "unformatted physical plan : " << endl;
cout << toString((const SNode*)pPhyPlan, false) << endl;
cout << toString((const SNode*)pPlan, false) << endl;
}
return true;

View File

@ -111,7 +111,7 @@ typedef struct SSchJob {
void *transport;
SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr
SArray *levels; // Element is SQueryLevel, starting from 0. SArray<SSchLevel>
SArray *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
int32_t levelIdx;
SEpSet dataSrcEps;
@ -135,9 +135,9 @@ typedef struct SSchJob {
#define SCH_TASK_READY_TO_LUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)
#define SCH_TASK_NO_NEED_DROP(task) ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
@ -145,7 +145,7 @@ typedef struct SSchJob {
#define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st)
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)

View File

@ -203,8 +203,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SSubplan *pPlan = pTask->plan;
int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0;
int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0;
int32_t childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
int32_t parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
if (childNum > 0) {
if (pJob->levelIdx == pLevel->level) {
@ -220,8 +220,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for (int32_t n = 0; n < childNum; ++n) {
SSubplan **child = taosArrayGet(pPlan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
SSubplan *child = (SSubplan*)nodesListGetNode(pPlan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) {
SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
@ -252,8 +252,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
for (int32_t n = 0; n < parentNum; ++n) {
SSubplan **parent = taosArrayGet(pPlan->pParents, n);
SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES);
SSubplan *parent = (SSubplan*)nodesListGetNode(pPlan->pParents, n);
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) {
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
@ -312,7 +312,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t levelNum = (int32_t)taosArrayGetSize(pDag->pSubplans);
int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
if (levelNum <= 0) {
SCH_JOB_ELOG("invalid level num:%d", levelNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -336,7 +336,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pJob->subPlans = pDag->pSubplans;
SSchLevel level = {0};
SArray *plans = NULL;
SNodeListNode *plans = NULL;
int32_t taskNum = 0;
SSchLevel *pLevel = NULL;
@ -351,13 +351,13 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pLevel = taosArrayGet(pJob->levels, i);
pLevel->level = i;
plans = taosArrayGetP(pDag->pSubplans, i);
plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, i);
if (NULL == plans) {
SCH_JOB_ELOG("empty level plan, level:%d", i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
taskNum = (int32_t)taosArrayGetSize(plans);
taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
if (taskNum <= 0) {
SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
@ -372,9 +372,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
}
for (int32_t n = 0; n < taskNum; ++n) {
SSubplan *plan = taosArrayGetP(plans, n);
SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, n);
SCH_SET_JOB_TYPE(&pJob->attr, plan->type);
SCH_SET_JOB_TYPE(&pJob->attr, plan->subplanType);
SSchTask task = {0};
SSchTask *pTask = &task;
@ -1420,18 +1420,18 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD
}
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) == 0) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || LIST_LENGTH(pDag->pSubplans) == 0) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t levelNum = taosArrayGetSize(pDag->pSubplans);
int32_t levelNum = LIST_LENGTH(pDag->pSubplans);
if (1 != levelNum) {
qError("invalid level num: %d", levelNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SArray *plans = taosArrayGet(pDag->pSubplans, 0);
int32_t taskNum = taosArrayGetSize(plans);
SNodeListNode *plans = (SNodeListNode*)nodesListGetNode(pDag->pSubplans, 0);
int32_t taskNum = LIST_LENGTH(plans->pNodeList);
if (taskNum <= 0) {
qError("invalid task num: %d", taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -1449,7 +1449,7 @@ int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks) {
int32_t code = 0;
for (int32_t i = 0; i < taskNum; ++i) {
SSubplan *plan = taosArrayGetP(plans, i);
SSubplan *plan = (SSubplan*)nodesListGetNode(plans->pNodeList, i);
tInfo.addr = plan->execNode;
code = qSubPlanToString(plan, &msg, &msgLen);

View File

@ -78,9 +78,9 @@ void schtBuildQueryDag(SQueryPlan *dag) {
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
SArray *scan = taosArrayInit(1, POINTER_BYTES);
SArray *merge = taosArrayInit(1, POINTER_BYTES);
dag->pSubplans = nodesMakeList();
SNodeListNode *scan = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
SNodeListNode *merge = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *scanPlan = (SSubplan *)calloc(1, sizeof(SSubplan));
SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan));
@ -88,7 +88,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan->id.queryId = qId;
scanPlan->id.templateId = 0x0000000000000002;
scanPlan->id.subplanId = 0x0000000000000003;
scanPlan->type = QUERY_TYPE_SCAN;
scanPlan->subplanType = SUBPLAN_TYPE_SCAN;
scanPlan->execNode.nodeId = 1;
scanPlan->execNode.epset.inUse = 0;
@ -96,30 +96,30 @@ void schtBuildQueryDag(SQueryPlan *dag) {
scanPlan->pChildren = NULL;
scanPlan->level = 1;
scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan->pParents = nodesMakeList();
scanPlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
scanPlan->msgType = TDMT_VND_QUERY;
mergePlan->id.queryId = qId;
mergePlan->id.templateId = schtMergeTemplateId;
mergePlan->id.subplanId = 0x5555555555;
mergePlan->type = QUERY_TYPE_MERGE;
mergePlan->id.subplanId = 0x5555;
mergePlan->subplanType = SUBPLAN_TYPE_MERGE;
mergePlan->level = 0;
mergePlan->execNode.epset.numOfEps = 0;
mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan->pChildren = nodesMakeList();
mergePlan->pParents = NULL;
mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode));
mergePlan->msgType = TDMT_VND_QUERY;
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
nodesListAppend(merge->pNodeList, (SNode*)mergePlan);
nodesListAppend(scan->pNodeList, (SNode*)scanPlan);
taosArrayPush(mergePlan->pChildren, &scanPlan);
taosArrayPush(scanPlan->pParents, &mergePlan);
nodesListAppend(mergePlan->pChildren, (SNode*)scanPlan);
nodesListAppend(scanPlan->pParents, (SNode*)mergePlan);
taosArrayPush(dag->pSubplans, &merge);
taosArrayPush(dag->pSubplans, &scan);
nodesListAppend(dag->pSubplans, (SNode*)merge);
nodesListAppend(dag->pSubplans, (SNode*)scan);
}
void schtFreeQueryDag(SQueryPlan *dag) {
@ -132,15 +132,15 @@ void schtBuildInsertDag(SQueryPlan *dag) {
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
SArray *inserta = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
dag->pSubplans = nodesMakeList();
SNodeListNode *inserta = (SNodeListNode*)nodesMakeNode(QUERY_NODE_NODE_LIST);
SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan));
insertPlan[0].id.queryId = qId;
insertPlan[0].id.templateId = 0x0000000000000003;
insertPlan[0].id.subplanId = 0x0000000000000004;
insertPlan[0].type = QUERY_TYPE_MODIFY;
insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY;
insertPlan[0].level = 0;
insertPlan[0].execNode.nodeId = 1;
@ -156,7 +156,7 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan[1].id.queryId = qId;
insertPlan[1].id.templateId = 0x0000000000000003;
insertPlan[1].id.subplanId = 0x0000000000000005;
insertPlan[1].type = QUERY_TYPE_MODIFY;
insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY;
insertPlan[1].level = 0;
insertPlan[1].execNode.nodeId = 1;
@ -169,11 +169,11 @@ void schtBuildInsertDag(SQueryPlan *dag) {
insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode));
insertPlan[1].msgType = TDMT_VND_SUBMIT;
taosArrayPush(inserta, &insertPlan);
nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
insertPlan += 1;
taosArrayPush(inserta, &insertPlan);
nodesListAppend(inserta->pNodeList, (SNode*)insertPlan);
taosArrayPush(dag->pSubplans, &inserta);
nodesListAppend(dag->pSubplans, (SNode*)inserta);
}
@ -347,7 +347,7 @@ void* schtRunJobThread(void *aa) {
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
SQueryPlan dag = {0};
SQueryPlan dag;
schtInitLogFile();
@ -517,7 +517,7 @@ TEST(queryTest, normalCase) {
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
SSchJob *pJob = NULL;
SQueryPlan dag = {0};
SQueryPlan dag;
schtInitLogFile();
@ -620,7 +620,7 @@ TEST(insertTest, normalCase) {
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
SQueryPlan dag = {0};
SQueryPlan dag;
uint64_t numOfRows = 0;
schtInitLogFile();

View File

@ -73,6 +73,22 @@ int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj) {
return tjsonAddItemToArray(pJson, pJobj);
}
int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize, int32_t num) {
if (num > 0) {
SJson* pJsonArray = tjsonAddArrayToObject(pJson, pName);
if (NULL == pJsonArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (size_t i = 0; i < num; ++i) {
int32_t code = tjsonAddItem(pJsonArray, func, (const char*)pArray + itemSize * i);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
char* tjsonToString(const SJson* pJson) { return cJSON_Print((cJSON*)pJson); }
char* tjsonToUnformattedString(const SJson* pJson) { return cJSON_PrintUnformatted((cJSON*)pJson); }
@ -175,4 +191,16 @@ int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, voi
return func(pJsonObj, pObj);
}
int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize) {
const cJSON* jArray = tjsonGetObjectItem(pJson, pName);
int32_t size = (NULL == jArray ? 0 : tjsonGetArraySize(jArray));
for (int32_t i = 0; i < size; ++i) {
int32_t code = func(tjsonGetArrayItem(jArray, i), (char*)pArray + itemSize * i);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
SJson* tjsonParse(const char* pStr) { return cJSON_Parse(pStr); }