From f0044d826f1d5814d106bfa4cb3529f36f8b5b76 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 31 Dec 2021 18:57:57 +0800 Subject: [PATCH] [td-11818]support create multiple tables. --- include/common/tmsgtype.h | 3 +- include/libs/parser/parsenodes.h | 4 +- source/client/src/clientImpl.c | 2 +- source/libs/parser/inc/parserInt.h | 4 +- source/libs/parser/src/dCDAstProcess.c | 139 ++++++++++++++++-------- source/libs/parser/src/parser.c | 29 +++-- source/libs/parser/test/parserTests.cpp | 10 +- source/libs/planner/inc/plannerInt.h | 2 +- source/libs/planner/src/logicPlan.c | 15 ++- source/libs/planner/src/physicalPlan.c | 22 ++-- 10 files changed, 151 insertions(+), 79 deletions(-) diff --git a/include/common/tmsgtype.h b/include/common/tmsgtype.h index 8e7ad87a0a..ebbf99b942 100644 --- a/include/common/tmsgtype.h +++ b/include/common/tmsgtype.h @@ -40,8 +40,9 @@ enum { // the SQL below is for mgmt node TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_STABLE, "create-stable" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" ) - TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) + TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" ) diff --git a/include/libs/parser/parsenodes.h b/include/libs/parser/parsenodes.h index 041adbb582..b326ac032c 100644 --- a/include/libs/parser/parsenodes.h +++ b/include/libs/parser/parsenodes.h @@ -27,7 +27,7 @@ extern "C" { #include "tname.h" #include "tvariant.h" -/* +/** * The first field of a node of any type is guaranteed to be the int16_t. * Hence the type of any node can be gotten by casting it to SQueryNode. */ @@ -157,7 +157,7 @@ typedef struct SVgDataBlocks { typedef struct SInsertStmtInfo { int16_t nodeType; SArray* pDataBlocks; // data block for each vgroup, SArray. - int8_t schemaAttache; // denote if submit block is built with table schema or not + int8_t schemaAttache; // denote if submit block is built with table schema or not uint8_t payloadType; // EPayloadType. 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert uint32_t insertType; // insert data from [file|sql statement| bound statement] const char* sql; // current sql statement position diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 050e763919..b0b2c57ee4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -202,7 +202,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) { } int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { - if (TSDB_SQL_INSERT == pRequest->type) { + if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); } return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index 4bbe6ab907..346bd0cbe4 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -68,7 +68,9 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ * @param type * @return */ -int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen); +SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); + +SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen); /** * Evaluate the numeric and timestamp arithmetic expression in the WHERE clause. diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index ecea48f583..76e26c159b 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -326,8 +326,7 @@ typedef struct SVgroupTablesBatch { SVgroupInfo info; } SVgroupTablesBatch; -int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len, - SEpSet* pEpSet) { +int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) { const char* msg1 = "invalid table name"; const char* msg2 = "tags number not matched"; const char* msg3 = "tag value too long"; @@ -359,7 +358,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p size_t numOfInputTag = taosArrayGetSize(pValList); STableMeta* pSuperTableMeta = NULL; - catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); + code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + assert(pSuperTableMeta != NULL); // too long tag values will return invalid sql, not be truncated automatically @@ -501,14 +504,6 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p req.ctbCfg.suid = pSuperTableMeta->suid; req.ctbCfg.pTag = row; - // pEpSet->inUse = info.inUse; - // pEpSet->numOfEps = info.numOfEps; - // for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - // pEpSet->port[i] = info.epAddr[i].port; - // tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i])); - // } - // ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId); - // ((SMsgHead*)(*pOutput))->contLen = htonl(serLen); SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); if (pTableBatch == NULL) { SVgroupTablesBatch tBatch = {0}; @@ -525,12 +520,12 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p } // TODO: serialize and - SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); - SVgroupTablesBatch** ppTbBatch = NULL; + SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); + + SVgroupTablesBatch* pTbBatch = NULL; do { - ppTbBatch = taosHashIterate(pVgroupHashmap, ppTbBatch); - if (ppTbBatch == NULL) break; - SVgroupTablesBatch* pTbBatch = *ppTbBatch; + pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch); + if (pTbBatch == NULL) break; int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req)); void* buf = malloc(tlen); @@ -544,17 +539,29 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req)); - taosArrayPush(pBufArray, &buf); + SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks)); + pVgData->vg = pTbBatch->info; + pVgData->pData = buf; + pVgData->size = tlen; + pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray); + taosArrayPush(pBufArray, &pVgData); } while (true); + SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo)); + pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE; + pStmtInfo->pDataBlocks = pBufArray; + *pOutput = pStmtInfo; + *len = sizeof(SInsertStmtInfo); + return TSDB_CODE_SUCCESS; } -int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, - int32_t msgBufLen) { +SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { int32_t code = 0; + SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); + SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf* pMsgBuf = &m; @@ -571,21 +578,25 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SToken* pPwd = &pUser->passwd; if (pName->n >= TSDB_USER_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } if (pInfo->type == TSDB_SQL_CREATE_USER) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } } else { if (pUser->type == TSDB_ALTER_USER_PASSWD) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } } else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) { assert(pPwd->type == TSDB_DATA_TYPE_NULL); @@ -596,10 +607,12 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm } else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) { // pCmd->count = 2; } else { - return buildInvalidOperationMsg(pMsgBuf, msg4); + code = buildInvalidOperationMsg(pMsgBuf, msg4); + goto _error; } } else { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } } @@ -618,15 +631,18 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SToken* pPwd = &pInfo->pMiscInfo->user.passwd; if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } if (pName->n >= TSDB_USER_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg3); + code = buildInvalidOperationMsg(pMsgBuf, msg3); + goto _error; } if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; @@ -636,7 +652,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { } else { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } } @@ -655,6 +672,10 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_SHOW: { SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW; break; } @@ -664,13 +685,15 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0); if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg); + code = buildInvalidOperationMsg(pMsgBuf, msg); + goto _error; } SName n = {0}; int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n); if (ret != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg); + code = buildInvalidOperationMsg(pMsgBuf, msg); + goto _error; } SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg)); @@ -689,19 +712,22 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt); if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) { - return buildInvalidOperationMsg(pMsgBuf, msg2); + code = buildInvalidOperationMsg(pMsgBuf, msg2); + goto _error; } char buf[TSDB_DB_NAME_LEN] = {0}; SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf)); if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf); if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_TSC_INVALID_OPERATION; + code = TSDB_CODE_TSC_INVALID_OPERATION; + goto _error; } pDcl->pMsg = (char*)pCreateMsg; @@ -719,7 +745,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm SName name = {0}; code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n); if (code != TSDB_CODE_SUCCESS) { - return buildInvalidOperationMsg(pMsgBuf, msg1); + code = buildInvalidOperationMsg(pMsgBuf, msg1); + goto _error; } SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg)); @@ -731,7 +758,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm pDcl->msgType = TDMT_MND_DROP_DB; pDcl->msgLen = sizeof(SDropDbMsg); pDcl->pMsg = (char*)pDropDbMsg; - return TSDB_CODE_SUCCESS; + break; } case TSDB_SQL_CREATE_TABLE: { @@ -739,14 +766,16 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) { if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) { - return code; + terrno = code; + goto _error; } + pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB; } else if (pCreateTable->type == TSQL_CREATE_CTABLE) { - if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) != + if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) != TSDB_CODE_SUCCESS) { - return code; + goto _error; } pDcl->msgType = TDMT_VND_CREATE_TABLE; @@ -761,7 +790,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_DROP_TABLE: { pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); if (pDcl->pMsg == NULL) { - code = terrno; + goto _error; } pDcl->msgType = TDMT_MND_DROP_STB; @@ -771,7 +800,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_CREATE_DNODE: { pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); if (pDcl->pMsg == NULL) { - code = terrno; + goto _error; } pDcl->msgType = TDMT_MND_CREATE_DNODE; @@ -781,7 +810,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm case TSDB_SQL_DROP_DNODE: { pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); if (pDcl->pMsg == NULL) { - code = terrno; + goto _error; } pDcl->msgType = TDMT_MND_DROP_DNODE; @@ -792,5 +821,29 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm break; } - return code; + return pDcl; + + _error: + terrno = code; + tfree(pDcl); + return NULL; } + +SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) { + SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo; + assert(pCreateTable->type == TSQL_CREATE_CTABLE); + + SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; + SMsgBuf* pMsgBuf = &m; + + SInsertStmtInfo* pInsertStmt = NULL; + + int32_t msgLen = 0; + int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen); + if (code != TSDB_CODE_SUCCESS) { + tfree(pInsertStmt); + return NULL; + } + + return pInsertStmt; +} \ No newline at end of file diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 710cf4b5d0..1b4d05808c 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -32,7 +32,7 @@ bool isInsertSql(const char* pStr, size_t length) { } bool qIsDdlQuery(const SQueryNode* pQuery) { - return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type; + return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type && TSDB_SQL_CREATE_TABLE != pQuery->type; } int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { @@ -44,16 +44,29 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { } if (!isDqlSqlStatement(&info)) { - SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); - if (NULL == pDcl) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. - return terrno; + bool toVnode = false; + if (info.type == TSDB_SQL_CREATE_TABLE) { + SCreateTableSql* pCreateSql = info.pCreateTableInfo; + if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) { + toVnode = true; + } } - pDcl->nodeType = info.type; - int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen); - if (code == TSDB_CODE_SUCCESS) { + if (toVnode) { + SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); + if (pInsertInfo == NULL) { + return terrno; + } + + *pQuery = (SQueryNode*) pInsertInfo; + } else { + SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); + if (pDcl == NULL) { + return terrno; + } + *pQuery = (SQueryNode*)pDcl; + pDcl->nodeType = info.type; } } else { SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); diff --git a/source/libs/parser/test/parserTests.cpp b/source/libs/parser/test/parserTests.cpp index a67a9a8be8..fe430c5f5e 100644 --- a/source/libs/parser/test/parserTests.cpp +++ b/source/libs/parser/test/parserTests.cpp @@ -714,10 +714,9 @@ TEST(testCase, show_user_Test) { SSqlInfo info1 = doGenerateAST(sql1); ASSERT_EQ(info1.valid, true); - SDclStmtInfo output; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL}; - int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); - ASSERT_EQ(code, 0); + SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len); + ASSERT_NE(output, nullptr); // convert the show command to be the select query // select name, privilege, create_time, account from information_schema.users; @@ -735,10 +734,9 @@ TEST(testCase, create_user_Test) { ASSERT_EQ(info1.valid, true); ASSERT_EQ(isDclSqlStatement(&info1), true); - SDclStmtInfo output; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"}; - int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); - ASSERT_EQ(code, 0); + SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len); + ASSERT_NE(output, nullptr); destroySqlInfo(&info1); } \ No newline at end of file diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 35c6d59ffe..a68102ea6e 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -40,7 +40,7 @@ extern "C" { #define QNODE_SESSIONWINDOW 12 #define QNODE_STATEWINDOW 13 #define QNODE_FILL 14 -#define QNODE_INSERT 15 +#define QNODE_MODIFY 15 typedef struct SQueryDistPlanNodeInfo { bool stableQuery; // super table query or not diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 136073aa60..5f95f86d4a 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -37,15 +37,19 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) { return 0; } -int32_t createInsertPlan(const SInsertStmtInfo* pInsert, SQueryPlanNode** pQueryPlan) { +static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { + SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode; + *pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); if (NULL == *pQueryPlan || NULL == blocks) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - (*pQueryPlan)->info.type = QNODE_INSERT; + + (*pQueryPlan)->info.type = QNODE_MODIFY; taosArrayAddAll(blocks, pInsert->pDataBlocks); (*pQueryPlan)->pExtInfo = blocks; + return TSDB_CODE_SUCCESS; } @@ -62,13 +66,14 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) { case TSDB_SQL_SELECT: { return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan); } + case TSDB_SQL_INSERT: - return createInsertPlan((const SInsertStmtInfo*)pNode, pQueryPlan); + case TSDB_SQL_CREATE_TABLE: + return createInsertPlan(pNode, pQueryPlan); + default: return TSDB_CODE_FAILED; } - - return TSDB_CODE_SUCCESS; } int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 97c9cec7c7..978b1554f3 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -34,7 +34,7 @@ static const char* gOpName[] = { #undef INCLUDE_AS_NAME }; -static void* vailidPointer(void* p) { +static void* validPointer(void* p) { if (NULL == p) { THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); } @@ -76,7 +76,7 @@ int32_t dsinkNameToDsinkType(const char* name) { } static SDataSink* initDataSink(int32_t type, int32_t size) { - SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size)); + SDataSink* sink = (SDataSink*)validPointer(calloc(1, size)); sink->info.type = type; sink->info.name = dsinkTypeToDsinkName(type); return sink; @@ -121,7 +121,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) { } static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { - SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size)); + SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size)); node->info.type = type; node->info.name = opTypeToOpName(type); if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) { @@ -184,7 +184,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable } static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { - SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan))); + SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan))); subplan->id = pCxt->nextId; ++(pCxt->nextId.subplanId); subplan->type = type; @@ -192,15 +192,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { if (NULL != pCxt->pCurrentSubplan) { subplan->level = pCxt->pCurrentSubplan->level + 1; if (NULL == pCxt->pCurrentSubplan->pChildern) { - pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); } taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan); - subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan); } SArray* currentLevel; if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) { - currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); taosArrayPush(pCxt->pDag->pSubplans, ¤tLevel); } else { currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level); @@ -272,7 +272,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; - case QNODE_INSERT: + case QNODE_MODIFY: // Insert is not an operator in a physical plan. break; default: @@ -306,7 +306,7 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { } static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { - if (QNODE_INSERT == pRoot->info.type) { + if (QNODE_MODIFY == pRoot->info.type) { splitInsertSubplan(pCxt, pRoot); } else { SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); @@ -321,12 +321,12 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD TRY(TSDB_MAX_TAG_CONDITIONS) { SPlanContext context = { .pCatalog = pCatalog, - .pDag = vailidPointer(calloc(1, sizeof(SQueryDag))), + .pDag = validPointer(calloc(1, sizeof(SQueryDag))), .pCurrentSubplan = NULL, .nextId = {0} // todo queryid }; *pDag = context.pDag; - context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); + context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); createSubplanByLevel(&context, pQueryNode); } CATCH(code) { CLEANUP_EXECUTE();