[td-11818]support create multiple tables.

This commit is contained in:
Haojun Liao 2021-12-31 18:57:57 +08:00
parent df9e82ee0a
commit f0044d826f
10 changed files with 151 additions and 79 deletions

View File

@ -40,6 +40,7 @@ enum {
// the SQL below is for mgmt node // the SQL below is for mgmt node
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_STABLE, "create-stable" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" )

View File

@ -27,7 +27,7 @@ extern "C" {
#include "tname.h" #include "tname.h"
#include "tvariant.h" #include "tvariant.h"
/* /**
* The first field of a node of any type is guaranteed to be the int16_t. * The first field of a node of any type is guaranteed to be the int16_t.
* Hence the type of any node can be gotten by casting it to SQueryNode. * Hence the type of any node can be gotten by casting it to SQueryNode.
*/ */

View File

@ -202,7 +202,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
} }
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
if (TSDB_SQL_INSERT == pRequest->type) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows); return scheduleExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob, &pRequest->affectedRows);
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);

View File

@ -68,7 +68,9 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ
* @param type * @param type
* @return * @return
*/ */
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, int32_t msgBufLen); SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen);
/** /**
* Evaluate the numeric and timestamp arithmetic expression in the WHERE clause. * Evaluate the numeric and timestamp arithmetic expression in the WHERE clause.

View File

@ -326,8 +326,7 @@ typedef struct SVgroupTablesBatch {
SVgroupInfo info; SVgroupInfo info;
} SVgroupTablesBatch; } SVgroupTablesBatch;
int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len, int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* pMsgBuf, char** pOutput, int32_t* len) {
SEpSet* pEpSet) {
const char* msg1 = "invalid table name"; const char* msg1 = "invalid table name";
const char* msg2 = "tags number not matched"; const char* msg2 = "tags number not matched";
const char* msg3 = "tag value too long"; const char* msg3 = "tag value too long";
@ -359,7 +358,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
size_t numOfInputTag = taosArrayGetSize(pValList); size_t numOfInputTag = taosArrayGetSize(pValList);
STableMeta* pSuperTableMeta = NULL; STableMeta* pSuperTableMeta = NULL;
catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta); code = catalogGetTableMeta(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, &name, &pSuperTableMeta);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
assert(pSuperTableMeta != NULL); assert(pSuperTableMeta != NULL);
// too long tag values will return invalid sql, not be truncated automatically // too long tag values will return invalid sql, not be truncated automatically
@ -501,14 +504,6 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
req.ctbCfg.suid = pSuperTableMeta->suid; req.ctbCfg.suid = pSuperTableMeta->suid;
req.ctbCfg.pTag = row; req.ctbCfg.pTag = row;
// pEpSet->inUse = info.inUse;
// pEpSet->numOfEps = info.numOfEps;
// for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
// pEpSet->port[i] = info.epAddr[i].port;
// tstrncpy(pEpSet->fqdn[i], info.epAddr[i].fqdn, tListLen(pEpSet->fqdn[i]));
// }
// ((SMsgHead*)(*pOutput))->vgId = htonl(info.vgId);
// ((SMsgHead*)(*pOutput))->contLen = htonl(serLen);
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId)); SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &info.vgId, sizeof(info.vgId));
if (pTableBatch == NULL) { if (pTableBatch == NULL) {
SVgroupTablesBatch tBatch = {0}; SVgroupTablesBatch tBatch = {0};
@ -526,11 +521,11 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
// TODO: serialize and // TODO: serialize and
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*)); SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
SVgroupTablesBatch** ppTbBatch = NULL;
SVgroupTablesBatch* pTbBatch = NULL;
do { do {
ppTbBatch = taosHashIterate(pVgroupHashmap, ppTbBatch); pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
if (ppTbBatch == NULL) break; if (pTbBatch == NULL) break;
SVgroupTablesBatch* pTbBatch = *ppTbBatch;
int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req)); int tlen = sizeof(SMsgHead) + tSVCreateTbBatchReqSerialize(NULL, &(pTbBatch->req));
void* buf = malloc(tlen); void* buf = malloc(tlen);
@ -544,17 +539,29 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SMsgBuf* p
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req)); tSVCreateTbBatchReqSerialize(&pBuf, &(pTbBatch->req));
taosArrayPush(pBufArray, &buf); SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
pVgData->vg = pTbBatch->info;
pVgData->pData = buf;
pVgData->size = tlen;
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
taosArrayPush(pBufArray, &pVgData);
} while (true); } while (true);
SInsertStmtInfo* pStmtInfo = calloc(1, sizeof(SInsertStmtInfo));
pStmtInfo->nodeType = TSDB_SQL_CREATE_TABLE;
pStmtInfo->pDataBlocks = pBufArray;
*pOutput = pStmtInfo;
*len = sizeof(SInsertStmtInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStmtInfo* pDcl, char* msgBuf, SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
int32_t msgBufLen) {
int32_t code = 0; int32_t code = 0;
SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo));
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m; SMsgBuf* pMsgBuf = &m;
@ -571,21 +578,25 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pPwd = &pUser->passwd; SToken* pPwd = &pUser->passwd;
if (pName->n >= TSDB_USER_LEN) { if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3); code = buildInvalidOperationMsg(pMsgBuf, msg3);
goto _error;
} }
if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) { if (parserValidateIdToken(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
} }
if (pInfo->type == TSDB_SQL_CREATE_USER) { if (pInfo->type == TSDB_SQL_CREATE_USER) {
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
} else { } else {
if (pUser->type == TSDB_ALTER_USER_PASSWD) { if (pUser->type == TSDB_ALTER_USER_PASSWD) {
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
} else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) { } else if (pUser->type == TSDB_ALTER_USER_PRIVILEGES) {
assert(pPwd->type == TSDB_DATA_TYPE_NULL); assert(pPwd->type == TSDB_DATA_TYPE_NULL);
@ -596,10 +607,12 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) { } else if (strncasecmp(pPrivilege->z, "normal", 4) == 0 && pPrivilege->n == 4) {
// pCmd->count = 2; // pCmd->count = 2;
} else { } else {
return buildInvalidOperationMsg(pMsgBuf, msg4); code = buildInvalidOperationMsg(pMsgBuf, msg4);
goto _error;
} }
} else { } else {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
} }
@ -618,15 +631,18 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pPwd = &pInfo->pMiscInfo->user.passwd; SToken* pPwd = &pInfo->pMiscInfo->user.passwd;
if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) { if (parserValidatePassword(pPwd, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
if (pName->n >= TSDB_USER_LEN) { if (pName->n >= TSDB_USER_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg3); code = buildInvalidOperationMsg(pMsgBuf, msg3);
goto _error;
} }
if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(pName) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
} }
SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt; SCreateAcctInfo* pAcctOpt = &pInfo->pMiscInfo->acctOpt;
@ -636,7 +652,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
} else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) { } else if (strncmp(pAcctOpt->stat.z, "all", 3) == 0 && pAcctOpt->stat.n == 3) {
} else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) { } else if (strncmp(pAcctOpt->stat.z, "no", 2) == 0 && pAcctOpt->stat.n == 2) {
} else { } else {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
} }
@ -655,6 +672,10 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_SHOW: { case TSDB_SQL_SHOW: {
SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf); code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, &pDcl->pExtension, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW; pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE) ? TDMT_VND_SHOW_TABLES : TDMT_MND_SHOW;
break; break;
} }
@ -664,13 +685,15 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0); SToken* pToken = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(pToken) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg); code = buildInvalidOperationMsg(pMsgBuf, msg);
goto _error;
} }
SName n = {0}; SName n = {0};
int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n); int32_t ret = tNameSetDbName(&n, pCtx->acctId, pToken->z, pToken->n);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg); code = buildInvalidOperationMsg(pMsgBuf, msg);
goto _error;
} }
SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg)); SUseDbMsg* pUseDbMsg = (SUseDbMsg*)calloc(1, sizeof(SUseDbMsg));
@ -689,19 +712,22 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt); SCreateDbInfo* pCreateDB = &(pInfo->pMiscInfo->dbOpt);
if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) { if (pCreateDB->dbname.n >= TSDB_DB_NAME_LEN) {
return buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);
goto _error;
} }
char buf[TSDB_DB_NAME_LEN] = {0}; char buf[TSDB_DB_NAME_LEN] = {0};
SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf)); SToken token = taosTokenDup(&pCreateDB->dbname, buf, tListLen(buf));
if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) { if (parserValidateNameToken(&token) != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf); SCreateDbMsg* pCreateMsg = buildCreateDbMsg(pCreateDB, pCtx, pMsgBuf);
if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) { if (doCheckDbOptions(pCreateMsg, pMsgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _error;
} }
pDcl->pMsg = (char*)pCreateMsg; pDcl->pMsg = (char*)pCreateMsg;
@ -719,7 +745,8 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
SName name = {0}; SName name = {0};
code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n); code = tNameSetDbName(&name, pCtx->acctId, dbName->z, dbName->n);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return buildInvalidOperationMsg(pMsgBuf, msg1); code = buildInvalidOperationMsg(pMsgBuf, msg1);
goto _error;
} }
SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg)); SDropDbMsg* pDropDbMsg = (SDropDbMsg*)calloc(1, sizeof(SDropDbMsg));
@ -731,7 +758,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
pDcl->msgType = TDMT_MND_DROP_DB; pDcl->msgType = TDMT_MND_DROP_DB;
pDcl->msgLen = sizeof(SDropDbMsg); pDcl->msgLen = sizeof(SDropDbMsg);
pDcl->pMsg = (char*)pDropDbMsg; pDcl->pMsg = (char*)pDropDbMsg;
return TSDB_CODE_SUCCESS; break;
} }
case TSDB_SQL_CREATE_TABLE: { case TSDB_SQL_CREATE_TABLE: {
@ -739,14 +766,16 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) { if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) { if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
return code; terrno = code;
goto _error;
} }
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB; pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) { } else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet)) != if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
return code; goto _error;
} }
pDcl->msgType = TDMT_VND_CREATE_TABLE; pDcl->msgType = TDMT_VND_CREATE_TABLE;
@ -761,7 +790,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_DROP_TABLE: { case TSDB_SQL_DROP_TABLE: {
pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf); pDcl->pMsg = (char*)buildDropStableMsg(pInfo, &pDcl->msgLen, pCtx, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; goto _error;
} }
pDcl->msgType = TDMT_MND_DROP_STB; pDcl->msgType = TDMT_MND_DROP_STB;
@ -771,7 +800,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_CREATE_DNODE: { case TSDB_SQL_CREATE_DNODE: {
pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); pDcl->pMsg = (char*)buildCreateDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; goto _error;
} }
pDcl->msgType = TDMT_MND_CREATE_DNODE; pDcl->msgType = TDMT_MND_CREATE_DNODE;
@ -781,7 +810,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
case TSDB_SQL_DROP_DNODE: { case TSDB_SQL_DROP_DNODE: {
pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf); pDcl->pMsg = (char*)buildDropDnodeMsg(pInfo, &pDcl->msgLen, pMsgBuf);
if (pDcl->pMsg == NULL) { if (pDcl->pMsg == NULL) {
code = terrno; goto _error;
} }
pDcl->msgType = TDMT_MND_DROP_DNODE; pDcl->msgType = TDMT_MND_DROP_DNODE;
@ -792,5 +821,29 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
break; break;
} }
return code; return pDcl;
_error:
terrno = code;
tfree(pDcl);
return NULL;
}
SInsertStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, char* msgBuf, int32_t msgBufLen) {
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
assert(pCreateTable->type == TSQL_CREATE_CTABLE);
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m;
SInsertStmtInfo* pInsertStmt = NULL;
int32_t msgLen = 0;
int32_t code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, (char**) &pInsertStmt, &msgLen);
if (code != TSDB_CODE_SUCCESS) {
tfree(pInsertStmt);
return NULL;
}
return pInsertStmt;
} }

View File

@ -32,7 +32,7 @@ bool isInsertSql(const char* pStr, size_t length) {
} }
bool qIsDdlQuery(const SQueryNode* pQuery) { bool qIsDdlQuery(const SQueryNode* pQuery) {
return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type; return TSDB_SQL_INSERT != pQuery->type && TSDB_SQL_SELECT != pQuery->type && TSDB_SQL_CREATE_TABLE != pQuery->type;
} }
int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
@ -44,16 +44,29 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) {
} }
if (!isDqlSqlStatement(&info)) { if (!isDqlSqlStatement(&info)) {
SDclStmtInfo* pDcl = calloc(1, sizeof(SDclStmtInfo)); bool toVnode = false;
if (NULL == pDcl) { if (info.type == TSDB_SQL_CREATE_TABLE) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. SCreateTableSql* pCreateSql = info.pCreateTableInfo;
if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) {
toVnode = true;
}
}
if (toVnode) {
SInsertStmtInfo *pInsertInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pInsertInfo == NULL) {
return terrno;
}
*pQuery = (SQueryNode*) pInsertInfo;
} else {
SDclStmtInfo* pDcl = qParserValidateDclSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen);
if (pDcl == NULL) {
return terrno; return terrno;
} }
pDcl->nodeType = info.type;
int32_t code = qParserValidateDclSqlNode(&info, &pCxt->ctx, pDcl, pCxt->pMsg, pCxt->msgLen);
if (code == TSDB_CODE_SUCCESS) {
*pQuery = (SQueryNode*)pDcl; *pQuery = (SQueryNode*)pDcl;
pDcl->nodeType = info.type;
} }
} else { } else {
SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo));

View File

@ -714,10 +714,9 @@ TEST(testCase, show_user_Test) {
SSqlInfo info1 = doGenerateAST(sql1); SSqlInfo info1 = doGenerateAST(sql1);
ASSERT_EQ(info1.valid, true); ASSERT_EQ(info1.valid, true);
SDclStmtInfo output;
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL}; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc", .pTransporter = NULL};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len);
ASSERT_EQ(code, 0); ASSERT_NE(output, nullptr);
// convert the show command to be the select query // convert the show command to be the select query
// select name, privilege, create_time, account from information_schema.users; // select name, privilege, create_time, account from information_schema.users;
@ -735,10 +734,9 @@ TEST(testCase, create_user_Test) {
ASSERT_EQ(info1.valid, true); ASSERT_EQ(info1.valid, true);
ASSERT_EQ(isDclSqlStatement(&info1), true); ASSERT_EQ(isDclSqlStatement(&info1), true);
SDclStmtInfo output;
SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"}; SParseBasicCtx ct= {.requestId = 1, .acctId = 1, .db = "abc"};
int32_t code = qParserValidateDclSqlNode(&info1, &ct, &output, msg, buf.len); SDclStmtInfo* output = qParserValidateDclSqlNode(&info1, &ct, msg, buf.len);
ASSERT_EQ(code, 0); ASSERT_NE(output, nullptr);
destroySqlInfo(&info1); destroySqlInfo(&info1);
} }

View File

@ -40,7 +40,7 @@ extern "C" {
#define QNODE_SESSIONWINDOW 12 #define QNODE_SESSIONWINDOW 12
#define QNODE_STATEWINDOW 13 #define QNODE_STATEWINDOW 13
#define QNODE_FILL 14 #define QNODE_FILL 14
#define QNODE_INSERT 15 #define QNODE_MODIFY 15
typedef struct SQueryDistPlanNodeInfo { typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not bool stableQuery; // super table query or not

View File

@ -37,15 +37,19 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
return 0; return 0;
} }
int32_t createInsertPlan(const SInsertStmtInfo* pInsert, SQueryPlanNode** pQueryPlan) { static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode)); *pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES); SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
if (NULL == *pQueryPlan || NULL == blocks) { if (NULL == *pQueryPlan || NULL == blocks) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
(*pQueryPlan)->info.type = QNODE_INSERT;
(*pQueryPlan)->info.type = QNODE_MODIFY;
taosArrayAddAll(blocks, pInsert->pDataBlocks); taosArrayAddAll(blocks, pInsert->pDataBlocks);
(*pQueryPlan)->pExtInfo = blocks; (*pQueryPlan)->pExtInfo = blocks;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -62,13 +66,14 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
case TSDB_SQL_SELECT: { case TSDB_SQL_SELECT: {
return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan); return createSelectPlan((const SQueryStmtInfo*)pNode, pQueryPlan);
} }
case TSDB_SQL_INSERT: case TSDB_SQL_INSERT:
return createInsertPlan((const SInsertStmtInfo*)pNode, pQueryPlan); case TSDB_SQL_CREATE_TABLE:
return createInsertPlan(pNode, pQueryPlan);
default: default:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
return TSDB_CODE_SUCCESS;
} }
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) { int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql) {

View File

@ -34,7 +34,7 @@ static const char* gOpName[] = {
#undef INCLUDE_AS_NAME #undef INCLUDE_AS_NAME
}; };
static void* vailidPointer(void* p) { static void* validPointer(void* p) {
if (NULL == p) { if (NULL == p) {
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY); THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
} }
@ -76,7 +76,7 @@ int32_t dsinkNameToDsinkType(const char* name) {
} }
static SDataSink* initDataSink(int32_t type, int32_t size) { static SDataSink* initDataSink(int32_t type, int32_t size) {
SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size)); SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
sink->info.type = type; sink->info.type = type;
sink->info.name = dsinkTypeToDsinkName(type); sink->info.name = dsinkTypeToDsinkName(type);
return sink; return sink;
@ -121,7 +121,7 @@ static bool cloneExprArray(SArray** dst, SArray* src) {
} }
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) { static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
SPhyNode* node = (SPhyNode*)vailidPointer(calloc(1, size)); SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
node->info.type = type; node->info.type = type;
node->info.name = opTypeToOpName(type); node->info.name = opTypeToOpName(type);
if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) { if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
@ -184,7 +184,7 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
} }
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
SSubplan* subplan = vailidPointer(calloc(1, sizeof(SSubplan))); SSubplan* subplan = validPointer(calloc(1, sizeof(SSubplan)));
subplan->id = pCxt->nextId; subplan->id = pCxt->nextId;
++(pCxt->nextId.subplanId); ++(pCxt->nextId.subplanId);
subplan->type = type; subplan->type = type;
@ -192,15 +192,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
if (NULL != pCxt->pCurrentSubplan) { if (NULL != pCxt->pCurrentSubplan) {
subplan->level = pCxt->pCurrentSubplan->level + 1; subplan->level = pCxt->pCurrentSubplan->level + 1;
if (NULL == pCxt->pCurrentSubplan->pChildern) { if (NULL == pCxt->pCurrentSubplan->pChildern) {
pCxt->pCurrentSubplan->pChildern = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
} }
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan); taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
subplan->pParents = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan); taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
} }
SArray* currentLevel; SArray* currentLevel;
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) { if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
currentLevel = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
taosArrayPush(pCxt->pDag->pSubplans, &currentLevel); taosArrayPush(pCxt->pDag->pSubplans, &currentLevel);
} else { } else {
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level); currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
@ -272,7 +272,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TABLESCAN: case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode); node = createTableScanNode(pCxt, pPlanNode);
break; break;
case QNODE_INSERT: case QNODE_MODIFY:
// Insert is not an operator in a physical plan. // Insert is not an operator in a physical plan.
break; break;
default: default:
@ -306,7 +306,7 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
} }
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_INSERT == pRoot->info.type) { if (QNODE_MODIFY == pRoot->info.type) {
splitInsertSubplan(pCxt, pRoot); splitInsertSubplan(pCxt, pRoot);
} else { } else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
@ -321,12 +321,12 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
TRY(TSDB_MAX_TAG_CONDITIONS) { TRY(TSDB_MAX_TAG_CONDITIONS) {
SPlanContext context = { SPlanContext context = {
.pCatalog = pCatalog, .pCatalog = pCatalog,
.pDag = vailidPointer(calloc(1, sizeof(SQueryDag))), .pDag = validPointer(calloc(1, sizeof(SQueryDag))),
.pCurrentSubplan = NULL, .pCurrentSubplan = NULL,
.nextId = {0} // todo queryid .nextId = {0} // todo queryid
}; };
*pDag = context.pDag; *pDag = context.pDag;
context.pDag->pSubplans = vailidPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES)); context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
createSubplanByLevel(&context, pQueryNode); createSubplanByLevel(&context, pQueryNode);
} CATCH(code) { } CATCH(code) {
CLEANUP_EXECUTE(); CLEANUP_EXECUTE();