Merge pull request #10559 from taosdata/feature/3.0_query_integrate_wxy
TD-13706 integration create table stmt and insert stmt
This commit is contained in:
commit
be73f15a17
|
@ -85,54 +85,55 @@
|
|||
#define TK_DECIMAL 67
|
||||
#define TK_SHOW 68
|
||||
#define TK_DATABASES 69
|
||||
#define TK_NK_FLOAT 70
|
||||
#define TK_NK_BOOL 71
|
||||
#define TK_NK_VARIABLE 72
|
||||
#define TK_BETWEEN 73
|
||||
#define TK_IS 74
|
||||
#define TK_NULL 75
|
||||
#define TK_NK_LT 76
|
||||
#define TK_NK_GT 77
|
||||
#define TK_NK_LE 78
|
||||
#define TK_NK_GE 79
|
||||
#define TK_NK_NE 80
|
||||
#define TK_NK_EQ 81
|
||||
#define TK_LIKE 82
|
||||
#define TK_MATCH 83
|
||||
#define TK_NMATCH 84
|
||||
#define TK_IN 85
|
||||
#define TK_FROM 86
|
||||
#define TK_AS 87
|
||||
#define TK_JOIN 88
|
||||
#define TK_ON 89
|
||||
#define TK_INNER 90
|
||||
#define TK_SELECT 91
|
||||
#define TK_DISTINCT 92
|
||||
#define TK_WHERE 93
|
||||
#define TK_PARTITION 94
|
||||
#define TK_BY 95
|
||||
#define TK_SESSION 96
|
||||
#define TK_STATE_WINDOW 97
|
||||
#define TK_INTERVAL 98
|
||||
#define TK_SLIDING 99
|
||||
#define TK_FILL 100
|
||||
#define TK_VALUE 101
|
||||
#define TK_NONE 102
|
||||
#define TK_PREV 103
|
||||
#define TK_LINEAR 104
|
||||
#define TK_NEXT 105
|
||||
#define TK_GROUP 106
|
||||
#define TK_HAVING 107
|
||||
#define TK_ORDER 108
|
||||
#define TK_SLIMIT 109
|
||||
#define TK_SOFFSET 110
|
||||
#define TK_LIMIT 111
|
||||
#define TK_OFFSET 112
|
||||
#define TK_ASC 113
|
||||
#define TK_DESC 114
|
||||
#define TK_NULLS 115
|
||||
#define TK_FIRST 116
|
||||
#define TK_LAST 117
|
||||
#define TK_TABLES 70
|
||||
#define TK_NK_FLOAT 71
|
||||
#define TK_NK_BOOL 72
|
||||
#define TK_NK_VARIABLE 73
|
||||
#define TK_BETWEEN 74
|
||||
#define TK_IS 75
|
||||
#define TK_NULL 76
|
||||
#define TK_NK_LT 77
|
||||
#define TK_NK_GT 78
|
||||
#define TK_NK_LE 79
|
||||
#define TK_NK_GE 80
|
||||
#define TK_NK_NE 81
|
||||
#define TK_NK_EQ 82
|
||||
#define TK_LIKE 83
|
||||
#define TK_MATCH 84
|
||||
#define TK_NMATCH 85
|
||||
#define TK_IN 86
|
||||
#define TK_FROM 87
|
||||
#define TK_AS 88
|
||||
#define TK_JOIN 89
|
||||
#define TK_ON 90
|
||||
#define TK_INNER 91
|
||||
#define TK_SELECT 92
|
||||
#define TK_DISTINCT 93
|
||||
#define TK_WHERE 94
|
||||
#define TK_PARTITION 95
|
||||
#define TK_BY 96
|
||||
#define TK_SESSION 97
|
||||
#define TK_STATE_WINDOW 98
|
||||
#define TK_INTERVAL 99
|
||||
#define TK_SLIDING 100
|
||||
#define TK_FILL 101
|
||||
#define TK_VALUE 102
|
||||
#define TK_NONE 103
|
||||
#define TK_PREV 104
|
||||
#define TK_LINEAR 105
|
||||
#define TK_NEXT 106
|
||||
#define TK_GROUP 107
|
||||
#define TK_HAVING 108
|
||||
#define TK_ORDER 109
|
||||
#define TK_SLIMIT 110
|
||||
#define TK_SOFFSET 111
|
||||
#define TK_LIMIT 112
|
||||
#define TK_OFFSET 113
|
||||
#define TK_ASC 114
|
||||
#define TK_DESC 115
|
||||
#define TK_NULLS 116
|
||||
#define TK_FIRST 117
|
||||
#define TK_LAST 118
|
||||
|
||||
#define TK_SPACE 300
|
||||
#define TK_NK_COMMENT 301
|
||||
|
|
|
@ -74,7 +74,8 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_CREATE_DATABASE_STMT,
|
||||
QUERY_NODE_CREATE_TABLE_STMT,
|
||||
QUERY_NODE_USE_DATABASE_STMT,
|
||||
QUERY_NODE_SHOW_DATABASE_STMT, // temp
|
||||
QUERY_NODE_SHOW_DATABASES_STMT, // temp
|
||||
QUERY_NODE_SHOW_TABLES_STMT, // temp
|
||||
|
||||
// logic plan node
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN,
|
||||
|
@ -128,6 +129,7 @@ void nodesDestroyNode(SNodeptr pNode);
|
|||
|
||||
SNodeList* nodesMakeList();
|
||||
int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode);
|
||||
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode);
|
||||
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc);
|
||||
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
|
||||
SNodeptr nodesListGetNode(SNodeList* pList, int32_t index);
|
||||
|
|
|
@ -43,6 +43,7 @@ typedef struct SScanLogicNode {
|
|||
SLogicNode node;
|
||||
SNodeList* pScanCols;
|
||||
struct STableMeta* pMeta;
|
||||
SVgroupsInfo* pVgroupList;
|
||||
EScanType scanType;
|
||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
||||
STimeWindow scanRange;
|
||||
|
@ -84,7 +85,6 @@ typedef struct SSubLogicPlan {
|
|||
SNodeList* pChildren;
|
||||
SNodeList* pParents;
|
||||
SLogicNode* pNode;
|
||||
SQueryNodeAddr execNode;
|
||||
ESubplanType subplanType;
|
||||
int32_t level;
|
||||
} SSubLogicPlan;
|
||||
|
|
|
@ -123,6 +123,7 @@ struct STableMeta;
|
|||
typedef struct SRealTableNode {
|
||||
STableNode table; // QUERY_NODE_REAL_TABLE
|
||||
struct STableMeta* pMeta;
|
||||
SVgroupsInfo* pVgroupList;
|
||||
} SRealTableNode;
|
||||
|
||||
typedef struct STempTableNode {
|
||||
|
|
|
@ -38,8 +38,9 @@ typedef struct SParseContext {
|
|||
typedef struct SCmdMsgInfo {
|
||||
int16_t msgType;
|
||||
SEpSet epSet;
|
||||
char* pMsg;
|
||||
void* pMsg;
|
||||
int32_t msgLen;
|
||||
void* pExtension; // todo remove it soon
|
||||
} SCmdMsgInfo;
|
||||
|
||||
typedef struct SQuery {
|
||||
|
@ -50,6 +51,7 @@ typedef struct SQuery {
|
|||
int32_t numOfResCols;
|
||||
SSchema* pResSchema;
|
||||
SCmdMsgInfo* pCmdMsg;
|
||||
int32_t msgType;
|
||||
} SQuery;
|
||||
|
||||
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
|
||||
|
|
|
@ -28,7 +28,7 @@ typedef struct SPlanContext {
|
|||
} SPlanContext;
|
||||
|
||||
// Create the physical plan for the query, according to the AST.
|
||||
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan);
|
||||
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList);
|
||||
|
||||
// Set datasource of this subplan, multiple calls may be made to a subplan.
|
||||
// @subplan subplan to be schedule
|
||||
|
|
|
@ -176,6 +176,14 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
|
||||
|
||||
if (pMsgInfo->msgType == TDMT_VND_SHOW_TABLES) {
|
||||
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
|
||||
if (pShowReqInfo->pArray == NULL) {
|
||||
pShowReqInfo->currentIndex = 0; // set the first vnode/ then iterate the next vnode
|
||||
pShowReqInfo->pArray = pMsgInfo->pExtension;
|
||||
}
|
||||
}
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
|
||||
|
||||
|
@ -183,10 +191,10 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pDag, SArray* pNodeList) {
|
||||
pRequest->type = pQuery->sqlNodeType;
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
|
||||
pRequest->type = pQuery->msgType;
|
||||
SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot };
|
||||
int32_t code = qCreateQueryPlan(&cxt, pDag);
|
||||
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -219,7 +227,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
|||
return pRequest->code;
|
||||
}
|
||||
|
||||
if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
|
||||
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||
pRequest->body.resInfo.numOfRows = res.numOfRows;
|
||||
|
||||
if (pRequest->body.queryJob != 0) {
|
||||
|
|
|
@ -160,7 +160,6 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode
|
|||
|
||||
static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) {
|
||||
COPY_NODE_FIELD(pNode);
|
||||
COPY_SCALAR_FIELD(execNode);
|
||||
COPY_SCALAR_FIELD(subplanType);
|
||||
return (SNode*)pDst;
|
||||
}
|
||||
|
|
|
@ -80,8 +80,10 @@ const char* nodesNodeName(ENodeType type) {
|
|||
return "CreateTableStmt";
|
||||
case QUERY_NODE_USE_DATABASE_STMT:
|
||||
return "UseDatabaseStmt";
|
||||
case QUERY_NODE_SHOW_DATABASE_STMT:
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
return "ShowDatabaseStmt";
|
||||
case QUERY_NODE_SHOW_TABLES_STMT:
|
||||
return "ShowTablesStmt";
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
return "LogicScan";
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
|
@ -1322,7 +1324,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
|||
case QUERY_NODE_CREATE_DATABASE_STMT:
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
case QUERY_NODE_USE_DATABASE_STMT:
|
||||
case QUERY_NODE_SHOW_DATABASE_STMT:
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
case QUERY_NODE_SHOW_TABLES_STMT:
|
||||
break;
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
return logicScanNodeToJson(pObj, pJson);
|
||||
|
|
|
@ -86,7 +86,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SCreateTableStmt));
|
||||
case QUERY_NODE_USE_DATABASE_STMT:
|
||||
return makeNode(type, sizeof(SUseDatabaseStmt));
|
||||
case QUERY_NODE_SHOW_DATABASE_STMT:
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
case QUERY_NODE_SHOW_TABLES_STMT:
|
||||
return makeNode(type, sizeof(SNode));;
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
return makeNode(type, sizeof(SScanLogicNode));
|
||||
|
@ -202,6 +203,17 @@ int32_t nodesListAppend(SNodeList* pList, SNodeptr pNode) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t nodesListStrictAppend(SNodeList* pList, SNodeptr pNode) {
|
||||
if (NULL == pNode) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t code = nodesListAppend(pList, pNode);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(pNode);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc) {
|
||||
if (NULL == pTarget || NULL == pSrc) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -101,8 +101,8 @@ cmd ::= CREATE TABLE exists_opt(A) full_table_name(B)
|
|||
|
||||
%type full_table_name { STokenPair }
|
||||
%destructor full_table_name { }
|
||||
full_table_name(A) ::= NK_ID(B). { STokenPair t = { .first = B, .second = nil_token}; A = t; }
|
||||
full_table_name(A) ::= NK_ID(B) NK_DOT NK_ID(C). { STokenPair t = { .first = B, .second = C}; A = t; }
|
||||
full_table_name(A) ::= NK_ID(B). { STokenPair t = { .first = nil_token, .second = B }; A = t; }
|
||||
full_table_name(A) ::= NK_ID(B) NK_DOT NK_ID(C). { STokenPair t = { .first = B, .second = C }; A = t; }
|
||||
|
||||
%type column_def_list { SNodeList* }
|
||||
%destructor column_def_list { nodesDestroyList($$); }
|
||||
|
@ -146,7 +146,8 @@ table_options(A) ::= table_options(B) KEEP NK_INTEGER(C).
|
|||
table_options(A) ::= table_options(B) TTL NK_INTEGER(C). { A = setTableOption(pCxt, B, TABLE_OPTION_TTL, &C); }
|
||||
|
||||
/************************************************ show ***************************************************************/
|
||||
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASE_STMT); }
|
||||
cmd ::= SHOW DATABASES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_DATABASES_STMT); }
|
||||
cmd ::= SHOW TABLES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TABLES_STMT); }
|
||||
|
||||
/************************************************ select *************************************************************/
|
||||
cmd ::= query_expression(A). { PARSER_TRACE; pCxt->pRootNode = A; }
|
||||
|
|
|
@ -714,18 +714,18 @@ SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDat
|
|||
strncpy(pCol->colName, pColName->z, pColName->n);
|
||||
pCol->dataType = dataType;
|
||||
if (NULL != pComment) {
|
||||
strncpy(pCol->colName, pColName->z, pColName->n);
|
||||
strncpy(pCol->comments, pComment->z, pComment->n);
|
||||
}
|
||||
return (SNode*)pCol;
|
||||
}
|
||||
|
||||
SDataType createDataType(uint8_t type) {
|
||||
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = 0 };
|
||||
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = tDataTypes[type].bytes };
|
||||
return dt;
|
||||
}
|
||||
|
||||
SDataType createVarLenDataType(uint8_t type, const SToken* pLen) {
|
||||
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = 0 };
|
||||
SDataType dt = { .type = type, .precision = 0, .scale = 0, .bytes = tDataTypes[type].bytes };
|
||||
return dt;
|
||||
}
|
||||
|
||||
|
@ -751,7 +751,7 @@ SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) {
|
|||
}
|
||||
|
||||
SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type) {
|
||||
SNode* pStmt = nodesMakeNode(QUERY_NODE_SHOW_DATABASE_STMT);;
|
||||
SNode* pStmt = nodesMakeNode(type);;
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
return pStmt;
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ static void setQuery(SAstCreateContext* pCxt, SQuery* pQuery) {
|
|||
pQuery->haveResultSet = false;
|
||||
pQuery->directRpc = true;
|
||||
}
|
||||
pQuery->msgType = (QUERY_NODE_CREATE_TABLE_STMT == type ? TDMT_VND_CREATE_TABLE : TDMT_VND_QUERY);
|
||||
}
|
||||
|
||||
int32_t doParse(SParseContext* pParseCxt, SQuery** pQuery) {
|
||||
|
|
|
@ -561,6 +561,29 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t setTableVgroupList(STranslateContext *pCxt, SName* name, SVgroupsInfo **pVgList) {
|
||||
SArray* vgroupList = NULL;
|
||||
int32_t code = catalogGetTableDistVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet), name, &vgroupList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
size_t vgroupNum = taosArrayGetSize(vgroupList);
|
||||
|
||||
SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum);
|
||||
vgList->numOfVgroups = vgroupNum;
|
||||
|
||||
for (int32_t i = 0; i < vgroupNum; ++i) {
|
||||
SVgroupInfo *vg = taosArrayGet(vgroupList, i);
|
||||
vgList->vgroups[i] = *vg;
|
||||
}
|
||||
|
||||
*pVgList = vgList;
|
||||
taosArrayDestroy(vgroupList);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pTable)) {
|
||||
|
@ -572,6 +595,10 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
|
|||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName);
|
||||
}
|
||||
code = setTableVgroupList(pCxt, &name, &(pRealTable->pVgroupList));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
code = addNamespace(pCxt, pRealTable);
|
||||
break;
|
||||
}
|
||||
|
@ -852,11 +879,7 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateShow(STranslateContext* pCxt) {
|
||||
static int32_t translateShowDatabases(STranslateContext* pCxt) {
|
||||
SShowReq showReq = { .type = TSDB_MGMT_TABLE_DB };
|
||||
|
||||
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
|
||||
|
@ -875,6 +898,34 @@ static int32_t translateShow(STranslateContext* pCxt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateShowTables(STranslateContext* pCxt) {
|
||||
SName name = {0};
|
||||
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db));
|
||||
char dbFname[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(&name, dbFname);
|
||||
|
||||
SArray* array = NULL;
|
||||
int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, false, &array);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
SVgroupInfo* info = taosArrayGet(array, 0);
|
||||
pShowReq->head.vgId = htonl(info->vgId);
|
||||
|
||||
pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo));
|
||||
if (NULL== pCxt->pCmdMsg) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCxt->pCmdMsg->epSet = info->epset;
|
||||
pCxt->pCmdMsg->msgType = TDMT_VND_SHOW_TABLES;
|
||||
pCxt->pCmdMsg->msgLen = sizeof(SVShowTablesReq);
|
||||
pCxt->pCmdMsg->pMsg = pShowReq;
|
||||
pCxt->pCmdMsg->pExtension = array;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pNode)) {
|
||||
|
@ -887,10 +938,11 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
|||
case QUERY_NODE_USE_DATABASE_STMT:
|
||||
code = translateUseDatabase(pCxt, (SUseDatabaseStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_DATABASE_STMT:
|
||||
code = translateShow(pCxt);
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
code = translateCreateTable(pCxt, (SCreateTableStmt*)pNode);
|
||||
case QUERY_NODE_SHOW_DATABASES_STMT:
|
||||
code = translateShowDatabases(pCxt);
|
||||
break;
|
||||
case QUERY_NODE_SHOW_TABLES_STMT:
|
||||
code = translateShowTables(pCxt);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
@ -942,9 +994,10 @@ typedef struct SVgroupTablesBatch {
|
|||
SVgroupInfo info;
|
||||
} SVgroupTablesBatch;
|
||||
|
||||
static void toSchema(const SColumnNode* pCol, SSchema* pSchema) {
|
||||
pSchema->type = pCol->node.resType.type;
|
||||
pSchema->bytes = pCol->node.resType.bytes;
|
||||
static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema) {
|
||||
pSchema->colId = colId;
|
||||
pSchema->type = pCol->dataType.type;
|
||||
pSchema->bytes = pCol->dataType.bytes;
|
||||
strcpy(pSchema->name, pCol->colName);
|
||||
}
|
||||
|
||||
|
@ -960,7 +1013,8 @@ static int32_t doBuildSingleTableBatchReq(SName* pTableName, SNodeList* pColumns
|
|||
SNode* pCol;
|
||||
int32_t index = 0;
|
||||
FOREACH(pCol, pColumns) {
|
||||
toSchema((SColumnNode*)pCol, req.ntbCfg.pSchema + index++);
|
||||
toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
|
||||
++index;
|
||||
}
|
||||
|
||||
pBatch->info = *pVgroupInfo;
|
||||
|
@ -1018,7 +1072,11 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
|||
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
|
||||
|
||||
SName tableName = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
|
||||
strcpy(tableName.dbname, pStmt->dbName);
|
||||
if ('\0' == pStmt->dbName[0]) {
|
||||
strcpy(tableName.dbname, pCxt->pParseCxt->db);
|
||||
} else {
|
||||
strcpy(tableName.dbname, pStmt->dbName);
|
||||
}
|
||||
strcpy(tableName.tname, pStmt->tableName);
|
||||
SVgroupInfo info = {0};
|
||||
catalogGetTableHashVgroup(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &tableName, &info);
|
||||
|
|
|
@ -159,12 +159,9 @@ static int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pPar
|
|||
}
|
||||
|
||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
|
||||
SName name = {0};
|
||||
createSName(&name, pTname, pCxt->pComCxt, &pCxt->msg);
|
||||
|
||||
char tableName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
tNameExtractFullName(&name, tableName);
|
||||
SParseContext* pBasicCtx = pCxt->pComCxt;
|
||||
SName name = {0};
|
||||
createSName(&name, pTname, pBasicCtx, &pCxt->msg);
|
||||
CHECK_CODE(catalogGetTableMeta(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &pCxt->pTableMeta));
|
||||
SVgroupInfo vg;
|
||||
CHECK_CODE(catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, &name, &vg));
|
||||
|
@ -939,6 +936,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
|
|||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
*pQuery = calloc(1, sizeof(SQuery));
|
||||
if (NULL == *pQuery) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
(*pQuery)->directRpc = false;
|
||||
(*pQuery)->haveResultSet = false;
|
||||
(*pQuery)->msgType = TDMT_VND_SUBMIT;
|
||||
(*pQuery)->pRoot = (SNode*)context.pOutput;
|
||||
context.pOutput->payloadType = PAYLOAD_TYPE_KV;
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -88,7 +88,7 @@ static SKeyword keywordTable[] = {
|
|||
// {"SCORES", TK_SCORES},
|
||||
// {"GRANTS", TK_GRANTS},
|
||||
// {"DOT", TK_DOT},
|
||||
// {"TABLES", TK_TABLES},
|
||||
{"TABLES", TK_TABLES},
|
||||
// {"STABLES", TK_STABLES},
|
||||
{"VGROUPS", TK_VGROUPS},
|
||||
// {"DROP", TK_DROP},
|
||||
|
|
|
@ -50,7 +50,7 @@ extern "C" {
|
|||
|
||||
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
|
||||
int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode);
|
||||
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan);
|
||||
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan, SArray* pExecNodeList);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -128,6 +128,7 @@ static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
|
|||
pScan->node.id = pCxt->planNodeId++;
|
||||
|
||||
pScan->pMeta = pRealTable->pMeta;
|
||||
pScan->pVgroupList = pRealTable->pVgroupList;
|
||||
|
||||
// set columns to scan
|
||||
SNodeList* pCols = NULL;
|
||||
|
|
|
@ -27,6 +27,7 @@ typedef struct SPhysiPlanContext {
|
|||
int32_t errCode;
|
||||
int16_t nextDataBlockId;
|
||||
SArray* pLocationHelper;
|
||||
SArray* pExecNodeList;
|
||||
} SPhysiPlanContext;
|
||||
|
||||
static int32_t getSlotKey(SNode* pNode, char* pKey) {
|
||||
|
@ -185,11 +186,41 @@ static int32_t setSlotOutput(SPhysiPlanContext* pCxt, SNodeList* pTargets, SData
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SNodeptr createPrimaryKeyCol(SPhysiPlanContext* pCxt, uint64_t tableId) {
|
||||
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
CHECK_ALLOC(pCol, NULL);
|
||||
pCol->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
pCol->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
|
||||
pCol->tableId = tableId;
|
||||
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
pCol->colType = COLUMN_TYPE_COLUMN;
|
||||
return pCol;
|
||||
}
|
||||
|
||||
static int32_t addPrimaryKeyCol(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode) {
|
||||
if (NULL == pScanPhysiNode->pScanCols) {
|
||||
pScanPhysiNode->pScanCols = nodesMakeList();
|
||||
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
|
||||
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid)));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pScanPhysiNode->pScanCols) {
|
||||
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid)));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode, SScanPhysiNode* pScanPhysiNode) {
|
||||
if (NULL != pScanLogicNode->pScanCols) {
|
||||
pScanPhysiNode->pScanCols = nodesCloneList(pScanLogicNode->pScanCols);
|
||||
CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CHECK_CODE(addPrimaryKeyCol(pCxt, pScanPhysiNode), TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
// Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t
|
||||
CHECK_CODE(addDataBlockDesc(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc), TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
||||
|
@ -206,6 +237,11 @@ static int32_t initScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanL
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) {
|
||||
pNodeAddr->nodeId = vg->vgId;
|
||||
pNodeAddr->epset = vg->epset;
|
||||
}
|
||||
|
||||
static SPhysiNode* createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
|
||||
STagScanPhysiNode* pTagScan = (STagScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
|
||||
CHECK_ALLOC(pTagScan, NULL);
|
||||
|
@ -213,21 +249,23 @@ static SPhysiNode* createTagScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNod
|
|||
return (SPhysiNode*)pTagScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
|
||||
static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
|
||||
STableScanPhysiNode* pTableScan = (STableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
|
||||
CHECK_ALLOC(pTableScan, NULL);
|
||||
CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan), (SPhysiNode*)pTableScan);
|
||||
pTableScan->scanFlag = pScanLogicNode->scanFlag;
|
||||
pTableScan->scanRange = pScanLogicNode->scanRange;
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
||||
return (SPhysiNode*)pTableScan;
|
||||
}
|
||||
|
||||
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) {
|
||||
static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) {
|
||||
switch (pScanLogicNode->scanType) {
|
||||
case SCAN_TYPE_TAG:
|
||||
return createTagScanPhysiNode(pCxt, pScanLogicNode);
|
||||
case SCAN_TYPE_TABLE:
|
||||
return createTableScanPhysiNode(pCxt, pScanLogicNode);
|
||||
return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode);
|
||||
case SCAN_TYPE_STABLE:
|
||||
case SCAN_TYPE_STREAM:
|
||||
break;
|
||||
|
@ -428,13 +466,13 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC
|
|||
return (SPhysiNode*)pProject;
|
||||
}
|
||||
|
||||
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPlan) {
|
||||
static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SLogicNode* pLogicPlan) {
|
||||
SNodeList* pChildren = nodesMakeList();
|
||||
CHECK_ALLOC(pChildren, NULL);
|
||||
|
||||
SNode* pLogicChild;
|
||||
FOREACH(pLogicChild, pLogicPlan->pChildren) {
|
||||
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, (SLogicNode*)pLogicChild);
|
||||
SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild);
|
||||
if (TSDB_CODE_SUCCESS != nodesListAppend(pChildren, pChildPhyNode)) {
|
||||
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
|
||||
nodesDestroyList(pChildren);
|
||||
|
@ -445,7 +483,7 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicPl
|
|||
SPhysiNode* pPhyNode = NULL;
|
||||
switch (nodeType(pLogicPlan)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||
pPhyNode = createScanPhysiNode(pCxt, (SScanLogicNode*)pLogicPlan);
|
||||
pPhyNode = createScanPhysiNode(pCxt, pSubplan, (SScanLogicNode*)pLogicPlan);
|
||||
break;
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN:
|
||||
pPhyNode = createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicPlan);
|
||||
|
@ -493,25 +531,17 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog
|
|||
SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode;
|
||||
pSubplan->pDataSink = createDataInserter(pCxt, pModif->pVgDataBlocks);
|
||||
pSubplan->msgType = pModif->msgType;
|
||||
pSubplan->execNode.epset = pModif->pVgDataBlocks->vg.epset;
|
||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
||||
} else {
|
||||
pSubplan->pNode = createPhysiNode(pCxt, pLogicSubplan->pNode);
|
||||
pSubplan->pNode = createPhysiNode(pCxt, pSubplan, pLogicSubplan->pNode);
|
||||
pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode);
|
||||
pSubplan->msgType = TDMT_VND_QUERY;
|
||||
}
|
||||
pSubplan->subplanType = pLogicSubplan->subplanType;
|
||||
return pSubplan;
|
||||
}
|
||||
|
||||
static int32_t strictListAppend(SNodeList* pList, SNodeptr pNode) {
|
||||
if (NULL == pNode) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t code = nodesListAppend(pList, pNode);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(pNode);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubLogicPlan** pSubLogicPlan) {
|
||||
*pSubLogicPlan = (SSubLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
|
||||
CHECK_ALLOC(*pSubLogicPlan, TSDB_CODE_OUT_OF_MEMORY);
|
||||
|
@ -529,7 +559,7 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l
|
|||
if (level >= LIST_LENGTH(pSubplans)) {
|
||||
pGroup = nodesMakeNode(QUERY_NODE_NODE_LIST);
|
||||
CHECK_ALLOC(pGroup, TSDB_CODE_OUT_OF_MEMORY);
|
||||
CHECK_CODE(strictListAppend(pSubplans, pGroup), TSDB_CODE_OUT_OF_MEMORY);
|
||||
CHECK_CODE(nodesListStrictAppend(pSubplans, pGroup), TSDB_CODE_OUT_OF_MEMORY);
|
||||
} else {
|
||||
pGroup = nodesListGetNode(pSubplans, level);
|
||||
}
|
||||
|
@ -537,7 +567,7 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l
|
|||
pGroup->pNodeList = nodesMakeList();
|
||||
CHECK_ALLOC(pGroup->pNodeList, TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CHECK_CODE(strictListAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY);
|
||||
CHECK_CODE(nodesListStrictAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSrc, int32_t level) {
|
||||
|
@ -562,7 +592,6 @@ static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int3
|
|||
SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level);
|
||||
CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY);
|
||||
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i);
|
||||
pNewSubplan->execNode.epset = blocks->vg.epset;
|
||||
((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = blocks;
|
||||
CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans));
|
||||
}
|
||||
|
@ -639,12 +668,13 @@ static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPl
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan) {
|
||||
int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan, SArray* pExecNodeList) {
|
||||
SPhysiPlanContext cxt = {
|
||||
.pPlanCxt = pCxt,
|
||||
.errCode = TSDB_CODE_SUCCESS,
|
||||
.nextDataBlockId = 0,
|
||||
.pLocationHelper = taosArrayInit(32, POINTER_BYTES)
|
||||
.pLocationHelper = taosArrayInit(32, POINTER_BYTES),
|
||||
.pExecNodeList = pExecNodeList
|
||||
};
|
||||
if (NULL == cxt.pLocationHelper) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -21,14 +21,14 @@ int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan) {
|
||||
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
|
||||
SLogicNode* pLogicNode = NULL;
|
||||
int32_t code = createLogicPlan(pCxt, &pLogicNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = optimize(pCxt, pLogicNode);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createPhysiPlan(pCxt, pLogicNode, pPlan);
|
||||
code = createPhysiPlan(pCxt, pLogicNode, pPlan, pExecNodeList);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ protected:
|
|||
|
||||
if (TEST_PHYSICAL_PLAN == target) {
|
||||
SQueryPlan* pPlan = nullptr;
|
||||
code = createPhysiPlan(&cxt, pLogicPlan, &pPlan);
|
||||
code = createPhysiPlan(&cxt, pLogicPlan, &pPlan, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
cout << "sql:[" << cxt_.pSql << "] physical plan code:" << code << ", strerror:" << tstrerror(code) << endl;
|
||||
return false;
|
||||
|
|
Loading…
Reference in New Issue