TD-13852 create stable and create subtable
This commit is contained in:
parent
08fb912aad
commit
14e5d6aab3
|
@ -78,7 +78,7 @@ typedef struct SCreateTableStmt {
|
|||
STableOptions options;
|
||||
} SCreateTableStmt;
|
||||
|
||||
typedef struct SCreateSubTableStmt {
|
||||
typedef struct SCreateSubTableClause {
|
||||
ENodeType type;
|
||||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char tableName[TSDB_TABLE_NAME_LEN];
|
||||
|
@ -87,7 +87,7 @@ typedef struct SCreateSubTableStmt {
|
|||
bool ignoreExists;
|
||||
SNodeList* pSpecificTags;
|
||||
SNodeList* pValsOfTags;
|
||||
} SCreateSubTableStmt;
|
||||
} SCreateSubTableClause;
|
||||
|
||||
typedef struct SCreateMultiTableStmt {
|
||||
ENodeType type;
|
||||
|
|
|
@ -73,7 +73,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_VNODE_MODIF_STMT,
|
||||
QUERY_NODE_CREATE_DATABASE_STMT,
|
||||
QUERY_NODE_CREATE_TABLE_STMT,
|
||||
QUERY_NODE_CREATE_SUBTABLE_STMT,
|
||||
QUERY_NODE_CREATE_SUBTABLE_CLAUSE,
|
||||
QUERY_NODE_CREATE_MULTI_TABLE_STMT,
|
||||
QUERY_NODE_USE_DATABASE_STMT,
|
||||
QUERY_NODE_SHOW_DATABASES_STMT, // temp
|
||||
|
|
|
@ -46,7 +46,6 @@ typedef struct SCmdMsgInfo {
|
|||
typedef struct SQuery {
|
||||
bool directRpc;
|
||||
bool haveResultSet;
|
||||
ENodeType sqlNodeType;
|
||||
SNode* pRoot;
|
||||
int32_t numOfResCols;
|
||||
SSchema* pResSchema;
|
||||
|
|
|
@ -469,6 +469,8 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260A)
|
||||
#define TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260B)
|
||||
#define TSDB_CODE_PAR_NOT_SINGLE_GROUP TAOS_DEF_ERROR_CODE(0, 0x260C)
|
||||
#define TSDB_CODE_PAR_TAGS_NOT_MATCHED TAOS_DEF_ERROR_CODE(0, 0x260D)
|
||||
#define TSDB_CODE_PAR_INVALID_TAG_NAME TAOS_DEF_ERROR_CODE(0, 0x260E)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -84,8 +84,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
|
|||
return makeNode(type, sizeof(SCreateDatabaseStmt));
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
return makeNode(type, sizeof(SCreateTableStmt));
|
||||
case QUERY_NODE_CREATE_SUBTABLE_STMT:
|
||||
return makeNode(type, sizeof(SCreateSubTableStmt));
|
||||
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
|
||||
return makeNode(type, sizeof(SCreateSubTableClause));
|
||||
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
|
||||
return makeNode(type, sizeof(SCreateMultiTableStmt));
|
||||
case QUERY_NODE_USE_DATABASE_STMT:
|
||||
|
|
|
@ -121,7 +121,7 @@ SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDat
|
|||
SDataType createDataType(uint8_t type);
|
||||
SDataType createVarLenDataType(uint8_t type, const SToken* pLen);
|
||||
SNode* createCreateTableStmt(SAstCreateContext* pCxt, bool ignoreExists, const STokenPair* pFullTableName, SNodeList* pCols, SNodeList* pTags, STableOptions* pOptions);
|
||||
SNode* createCreateSubTableStmt(SAstCreateContext* pCxt, bool ignoreExists,
|
||||
SNode* createCreateSubTableClause(SAstCreateContext* pCxt, bool ignoreExists,
|
||||
const STokenPair* pFullTableName, const STokenPair* pUseFullTableName, SNodeList* pSpecificTags, SNodeList* pValsOfTags);
|
||||
SNode* createCreateMultiTableStmt(SAstCreateContext* pCxt, SNodeList* pSubTables);
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ multi_create_clause(A) ::= multi_create_clause(B) create_subtable_clause(C).
|
|||
|
||||
create_subtable_clause(A) ::=
|
||||
exists_opt(B) full_table_name(C) USING full_table_name(D)
|
||||
specific_tags_opt(E) TAGS NK_LP literal_list(F) NK_RP. { A = createCreateSubTableStmt(pCxt, B, &C, &D, E, F); }
|
||||
specific_tags_opt(E) TAGS NK_LP literal_list(F) NK_RP. { A = createCreateSubTableClause(pCxt, B, &C, &D, E, F); }
|
||||
|
||||
%type specific_tags_opt { SNodeList* }
|
||||
%destructor specific_tags_opt { nodesDestroyList($$); }
|
||||
|
|
|
@ -755,9 +755,9 @@ SNode* createCreateTableStmt(SAstCreateContext* pCxt,
|
|||
return (SNode*)pStmt;
|
||||
}
|
||||
|
||||
SNode* createCreateSubTableStmt(SAstCreateContext* pCxt, bool ignoreExists,
|
||||
SNode* createCreateSubTableClause(SAstCreateContext* pCxt, bool ignoreExists,
|
||||
const STokenPair* pFullTableName, const STokenPair* pUseFullTableName, SNodeList* pSpecificTags, SNodeList* pValsOfTags) {
|
||||
SCreateSubTableStmt* pStmt = nodesMakeNode(QUERY_NODE_CREATE_SUBTABLE_STMT);
|
||||
SCreateSubTableClause* pStmt = nodesMakeNode(QUERY_NODE_CREATE_SUBTABLE_CLAUSE);
|
||||
CHECK_OUT_OF_MEM(pStmt);
|
||||
if (TK_NIL != pFullTableName->first.type) {
|
||||
strncpy(pStmt->dbName, pFullTableName->first.z, pFullTableName->first.n);
|
||||
|
|
|
@ -68,6 +68,10 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
|||
return "Not SELECTed expression";
|
||||
case TSDB_CODE_PAR_NOT_SINGLE_GROUP:
|
||||
return "Not a single-group group function";
|
||||
case TSDB_CODE_PAR_TAGS_NOT_MATCHED:
|
||||
return "tags number not matched";
|
||||
case TSDB_CODE_PAR_INVALID_TAG_NAME:
|
||||
return "invalid tag name : %s";
|
||||
case TSDB_CODE_OUT_OF_MEMORY:
|
||||
return "Out of memory";
|
||||
default:
|
||||
|
@ -672,7 +676,7 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro
|
|||
SNode* pExpr = ((SOrderByExprNode*)pNode)->pExpr;
|
||||
if (QUERY_NODE_VALUE == nodeType(pExpr)) {
|
||||
SValueNode* pVal = (SValueNode*)pExpr;
|
||||
if (!translateValue(pCxt, pVal)) {
|
||||
if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) {
|
||||
return pCxt->errCode;
|
||||
}
|
||||
int32_t pos = getPositionValue(pVal);
|
||||
|
@ -853,7 +857,7 @@ static int32_t columnNodeToField(SNodeList* pList, SArray** pArray) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateCreateSTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||
SMCreateStbReq createReq = {0};
|
||||
createReq.igExists = pStmt->ignoreExists;
|
||||
columnNodeToField(pStmt->pCols, &createReq.pColumns);
|
||||
|
@ -962,7 +966,7 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
|
|||
code = translateCreateDatabase(pCxt, (SCreateDatabaseStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
code = translateCreateSTable(pCxt, (SCreateTableStmt*)pNode);
|
||||
code = translateCreateSuperTable(pCxt, (SCreateTableStmt*)pNode);
|
||||
break;
|
||||
case QUERY_NODE_USE_DATABASE_STMT:
|
||||
code = translateUseDatabase(pCxt, (SUseDatabaseStmt*)pNode);
|
||||
|
@ -1030,15 +1034,16 @@ static void toSchema(const SColumnDefNode* pCol, int32_t colId, SSchema* pSchema
|
|||
strcpy(pSchema->name, pCol->colName);
|
||||
}
|
||||
|
||||
static int32_t doBuildSingleTableBatchReq(SName* pTableName, SNodeList* pColumns, SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
|
||||
static int32_t buildNormalTableBatchReq(
|
||||
const char* pTableName, const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
|
||||
SVCreateTbReq req = {0};
|
||||
req.type = TD_NORMAL_TABLE;
|
||||
req.name = strdup(tNameGetTableName(pTableName));
|
||||
|
||||
req.name = strdup(pTableName);
|
||||
req.ntbCfg.nCols = LIST_LENGTH(pColumns);
|
||||
int32_t num = req.ntbCfg.nCols;
|
||||
|
||||
req.ntbCfg.pSchema = calloc(num, sizeof(SSchema));
|
||||
req.ntbCfg.pSchema = calloc(req.ntbCfg.nCols, sizeof(SSchema));
|
||||
if (NULL == req.name || NULL == req.ntbCfg.pSchema) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
SNode* pCol;
|
||||
int32_t index = 0;
|
||||
FOREACH(pCol, pColumns) {
|
||||
|
@ -1048,34 +1053,36 @@ static int32_t doBuildSingleTableBatchReq(SName* pTableName, SNodeList* pColumns
|
|||
|
||||
pBatch->info = *pVgroupInfo;
|
||||
pBatch->req.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
|
||||
if (pBatch->req.pArray == NULL) {
|
||||
if (NULL == pBatch->req.pArray) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosArrayPush(pBatch->req.pArray, &req);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t serializeVgroupTablesBatchImpl(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
|
||||
static int32_t serializeVgroupTablesBatch(SVgroupTablesBatch* pTbBatch, SArray* pBufArray) {
|
||||
int tlen = sizeof(SMsgHead) + tSerializeSVCreateTbBatchReq(NULL, &(pTbBatch->req));
|
||||
void* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
if (NULL == buf) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(pTbBatch->info.vgId);
|
||||
((SMsgHead*)buf)->contLen = htonl(tlen);
|
||||
|
||||
void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
tSerializeSVCreateTbBatchReq(&pBuf, &(pTbBatch->req));
|
||||
|
||||
SVgDataBlocks* pVgData = calloc(1, sizeof(SVgDataBlocks));
|
||||
if (NULL == pVgData) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pVgData->vg = pTbBatch->info;
|
||||
pVgData->pData = buf;
|
||||
pVgData->size = tlen;
|
||||
pVgData->numOfTables = (int32_t) taosArrayGetSize(pTbBatch->req.pArray);
|
||||
|
||||
taosArrayPush(pBufArray, &pVgData);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
|
||||
|
@ -1096,43 +1103,288 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) {
|
|||
taosArrayDestroy(pTbBatch->req.pArray);
|
||||
}
|
||||
|
||||
static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
if (QUERY_NODE_CREATE_TABLE_STMT == nodeType(pQuery->pRoot) && NULL == ((SCreateTableStmt*)pQuery->pRoot)->pTags) {
|
||||
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
|
||||
static int32_t getTableHashVgroup(SParseContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) {
|
||||
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId };
|
||||
strcpy(name.dbname, pDbName);
|
||||
strcpy(name.tname, pTableName);
|
||||
return catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, &name, pInfo);
|
||||
}
|
||||
|
||||
SName tableName = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
|
||||
strcpy(tableName.dbname, pStmt->dbName);
|
||||
strcpy(tableName.tname, pStmt->tableName);
|
||||
SVgroupInfo info = {0};
|
||||
catalogGetTableHashVgroup(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &tableName, &info);
|
||||
static int32_t rewriteToVnodeModifOpStmt(SQuery* pQuery, SArray* pBufArray) {
|
||||
SVnodeModifOpStmt* pNewStmt = nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
if (pNewStmt == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pNewStmt->sqlNodeType = nodeType(pQuery->pRoot);
|
||||
pNewStmt->pDataBlocks = pBufArray;
|
||||
nodesDestroyNode(pQuery->pRoot);
|
||||
pQuery->pRoot = (SNode*)pNewStmt;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SVgroupTablesBatch tbatch = {0};
|
||||
int32_t code = doBuildSingleTableBatchReq(&tableName, pStmt->pCols, &info, &tbatch);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
static int32_t buildCreateTableDataBlock(const SCreateTableStmt* pStmt, const SVgroupInfo* pInfo, SArray** pBufArray) {
|
||||
SVgroupTablesBatch tbatch = {0};
|
||||
int32_t code = buildNormalTableBatchReq(pStmt->tableName, pStmt->pCols, pInfo, &tbatch);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pBufArray = taosArrayInit(1, POINTER_BYTES);
|
||||
if (NULL == pBufArray) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = serializeVgroupTablesBatch(&tbatch, *pBufArray);
|
||||
}
|
||||
destroyCreateTbReqBatch(&tbatch);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
|
||||
|
||||
SVgroupInfo info = {0};
|
||||
int32_t code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info);
|
||||
SArray* pBufArray;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCreateTableDataBlock(pStmt, &info, &pBufArray);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = rewriteToVnodeModifOpStmt(pQuery, pBufArray);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static void addCreateTbReqIntoVgroup(SHashObj* pVgroupHashmap, const char* pTableName, SKVRow row, uint64_t suid, SVgroupInfo* pVgInfo) {
|
||||
struct SVCreateTbReq req = {0};
|
||||
req.type = TD_CHILD_TABLE;
|
||||
req.name = strdup(pTableName);
|
||||
req.ctbCfg.suid = suid;
|
||||
req.ctbCfg.pTag = row;
|
||||
|
||||
SVgroupTablesBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||
if (pTableBatch == NULL) {
|
||||
SVgroupTablesBatch tBatch = {0};
|
||||
tBatch.info = *pVgInfo;
|
||||
|
||||
tBatch.req.pArray = taosArrayInit(4, sizeof(struct SVCreateTbReq));
|
||||
taosArrayPush(tBatch.req.pArray, &req);
|
||||
|
||||
taosHashPut(pVgroupHashmap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &tBatch, sizeof(tBatch));
|
||||
} else { // add to the correct vgroup
|
||||
taosArrayPush(pTableBatch->req.pArray, &req);
|
||||
}
|
||||
}
|
||||
|
||||
static void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
||||
pVal->nType = pNode->node.resType.type;
|
||||
pVal->nLen = pNode->node.resType.bytes;
|
||||
switch (pNode->node.resType.type) {
|
||||
case TSDB_DATA_TYPE_NULL:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
pVal->i = pNode->datum.b;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
pVal->i = pNode->datum.i;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
pVal->u = pNode->datum.u;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
pVal->d = pNode->datum.d;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
pVal->pz = pNode->datum.p;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
// todo
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema, SKVRowBuilder* pBuilder) {
|
||||
if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) {
|
||||
return pCxt->errCode;
|
||||
}
|
||||
SVariant var;
|
||||
valueNodeToVariant(pVal, &var);
|
||||
char tagVal[TSDB_MAX_TAGS_LEN] = {0};
|
||||
int32_t code = taosVariantDump(&var, tagVal, pSchema->type, true);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tdAddColToKVRow(pBuilder, pSchema->colId, pSchema->type, tagVal);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta, SKVRowBuilder* pBuilder) {
|
||||
int32_t numOfTags = getNumOfTags(pSuperTableMeta);
|
||||
if (LIST_LENGTH(pStmt->pValsOfTags) != LIST_LENGTH(pStmt->pSpecificTags) || numOfTags < LIST_LENGTH(pStmt->pValsOfTags)) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
|
||||
}
|
||||
|
||||
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
|
||||
SNode* pTag, *pVal;
|
||||
FORBOTH(pTag, pStmt->pSpecificTags, pVal, pStmt->pValsOfTags) {
|
||||
SColumnNode* pCol = (SColumnNode*)pTag;
|
||||
SSchema* pSchema = NULL;
|
||||
for (int32_t i = 0; i < numOfTags; ++i) {
|
||||
if (0 == strcmp(pCol->colName, pTagSchema[i].name)) {
|
||||
pSchema = pTagSchema + i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (NULL == pSchema) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_INVALID_TAG_NAME, pCol->colName);
|
||||
}
|
||||
int32_t code = addValToKVRow(pCxt, (SValueNode*)pVal, pSchema, pBuilder);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SArray* pBufArray = taosArrayInit(1, POINTER_BYTES);
|
||||
if (pBufArray == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
serializeVgroupTablesBatchImpl(&tbatch, pBufArray);
|
||||
destroyCreateTbReqBatch(&tbatch);
|
||||
|
||||
SVnodeModifOpStmt* pNewStmt = nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT);
|
||||
if (pNewStmt == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pNewStmt->sqlNodeType = nodeType(pQuery->pRoot);
|
||||
pNewStmt->pDataBlocks = pBufArray;
|
||||
pQuery->sqlNodeType = nodeType(pQuery->pRoot);
|
||||
nodesDestroyNode(pQuery->pRoot);
|
||||
pQuery->pRoot = (SNode*)pNewStmt;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClause* pStmt, STableMeta* pSuperTableMeta, SKVRowBuilder* pBuilder) {
|
||||
if (getNumOfTags(pSuperTableMeta) != LIST_LENGTH(pStmt->pValsOfTags)) {
|
||||
return generateSyntaxErrMsg(pCxt, TSDB_CODE_PAR_TAGS_NOT_MATCHED);
|
||||
}
|
||||
|
||||
SSchema* pTagSchema = getTableTagSchema(pSuperTableMeta);
|
||||
SNode* pVal;
|
||||
int32_t index = 0;
|
||||
FOREACH(pVal, pStmt->pValsOfTags) {
|
||||
int32_t code = addValToKVRow(pCxt, (SValueNode*)pVal, pTagSchema + index++, pBuilder);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) {
|
||||
SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId };
|
||||
strcpy(name.dbname, pStmt->useDbName);
|
||||
strcpy(name.tname, pStmt->useTableName);
|
||||
STableMeta* pSuperTableMeta = NULL;
|
||||
int32_t code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &name, &pSuperTableMeta);
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tdInitKVRowBuilder(&kvRowBuilder);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL != pStmt->pSpecificTags) {
|
||||
code = buildKVRowForBindTags(pCxt, pStmt, pSuperTableMeta, &kvRowBuilder);
|
||||
} else {
|
||||
code = buildKVRowForAllTags(pCxt, pStmt, pSuperTableMeta, &kvRowBuilder);
|
||||
}
|
||||
}
|
||||
|
||||
SKVRow row = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
row = tdGetKVRowFromBuilder(&kvRowBuilder);
|
||||
if (NULL == row) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tdSortKVRowByColIdx(row);
|
||||
}
|
||||
}
|
||||
|
||||
SVgroupInfo info = {0};
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getTableHashVgroup(pCxt->pParseCxt, pStmt->dbName, pStmt->tableName, &info);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
addCreateTbReqIntoVgroup(pVgroupHashmap, pStmt->tableName, row, pSuperTableMeta->uid, &info);
|
||||
}
|
||||
|
||||
tfree(pSuperTableMeta);
|
||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||
return code;
|
||||
}
|
||||
|
||||
static SArray* serializeVgroupsTablesBatch(SHashObj* pVgroupHashmap) {
|
||||
SArray* pBufArray = taosArrayInit(taosHashGetSize(pVgroupHashmap), sizeof(void*));
|
||||
if (NULL == pBufArray) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SVgroupTablesBatch* pTbBatch = NULL;
|
||||
do {
|
||||
pTbBatch = taosHashIterate(pVgroupHashmap, pTbBatch);
|
||||
if (pTbBatch == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
serializeVgroupTablesBatch(pTbBatch, pBufArray);
|
||||
destroyCreateTbReqBatch(pTbBatch);
|
||||
} while (true);
|
||||
|
||||
return pBufArray;
|
||||
}
|
||||
|
||||
static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
SCreateMultiTableStmt* pStmt = (SCreateMultiTableStmt*)pQuery->pRoot;
|
||||
|
||||
SHashObj* pVgroupHashmap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (NULL == pVgroupHashmap) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pStmt->pSubTables) {
|
||||
code = rewriteCreateSubTable(pCxt, (SCreateSubTableClause*)pNode, pVgroupHashmap);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
SArray* pBufArray = serializeVgroupsTablesBatch(pVgroupHashmap);
|
||||
if (NULL == pBufArray) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosHashCleanup(pVgroupHashmap);
|
||||
|
||||
return rewriteToVnodeModifOpStmt(pQuery, pBufArray);
|
||||
}
|
||||
|
||||
static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pQuery->pRoot)) {
|
||||
case QUERY_NODE_CREATE_TABLE_STMT:
|
||||
if (NULL == ((SCreateTableStmt*)pQuery->pRoot)->pTags) {
|
||||
code = rewriteCreateTable(pCxt, pQuery);
|
||||
}
|
||||
break;
|
||||
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
|
||||
code = rewriteCreateMultiTable(pCxt, pQuery);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(pQuery->pRoot)) {
|
||||
|
|
|
@ -1918,7 +1918,7 @@ static YYACTIONTYPE yy_reduce(
|
|||
yymsp[-1].minor.yy182 = yylhsminor.yy182;
|
||||
break;
|
||||
case 27: /* create_subtable_clause ::= exists_opt full_table_name USING full_table_name specific_tags_opt TAGS NK_LP literal_list NK_RP */
|
||||
{ yylhsminor.yy210 = createCreateSubTableStmt(pCxt, yymsp[-8].minor.yy187, &yymsp[-7].minor.yy341, &yymsp[-5].minor.yy341, yymsp[-4].minor.yy182, yymsp[-1].minor.yy182); }
|
||||
{ yylhsminor.yy210 = createCreateSubTableClause(pCxt, yymsp[-8].minor.yy187, &yymsp[-7].minor.yy341, &yymsp[-5].minor.yy341, yymsp[-4].minor.yy182, yymsp[-1].minor.yy182); }
|
||||
yymsp[-8].minor.yy210 = yylhsminor.yy210;
|
||||
break;
|
||||
case 28: /* specific_tags_opt ::= */
|
||||
|
|
|
@ -101,6 +101,7 @@ static SKeyword keywordTable[] = {
|
|||
{"SMA", TK_SMA},
|
||||
{"SMALLINT", TK_SMALLINT},
|
||||
{"SOFFSET", TK_SOFFSET},
|
||||
{"STABLE", TK_STABLE},
|
||||
{"STATE_WINDOW", TK_STATE_WINDOW},
|
||||
{"STREAM_MODE", TK_STREAM_MODE},
|
||||
{"TABLE", TK_TABLE},
|
||||
|
@ -176,7 +177,6 @@ static SKeyword keywordTable[] = {
|
|||
// {"CTIME", TK_CTIME},
|
||||
// {"LP", TK_LP},
|
||||
// {"RP", TK_RP},
|
||||
|
||||
// {"COMMA", TK_COMMA},
|
||||
// {"EVERY", TK_EVERY},
|
||||
// {"VARIABLE", TK_VARIABLE},
|
||||
|
@ -224,7 +224,6 @@ static SKeyword keywordTable[] = {
|
|||
// {"VIEW", TK_VIEW},
|
||||
// {"SEMI", TK_SEMI},
|
||||
// {"TBNAME", TK_TBNAME},
|
||||
// {"STABLE", TK_STABLE},
|
||||
// {"VNODES", TK_VNODES},
|
||||
// {"PARTITIONS", TK_PARTITIONS},
|
||||
// {"TOPIC", TK_TOPIC},
|
||||
|
|
|
@ -346,5 +346,16 @@ TEST_F(ParserTest, createTable) {
|
|||
"if not exists test.t3 using test.st1 (tc1, tc2, tc3) tags(3, 2.0, 'abc', TIMESTAMP '2022-3-6 11:20:23')"
|
||||
);
|
||||
ASSERT_TRUE(run());
|
||||
|
||||
bind("create stable t1(ts timestamp, c1 int) TAGS(id int)");
|
||||
ASSERT_TRUE(run());
|
||||
|
||||
bind("create stable if not exists test.t1("
|
||||
"ts TIMESTAMP, c1 INT, c2 INT UNSIGNED, c3 BIGINT, c4 BIGINT UNSIGNED, c5 FLOAT, c6 DOUBLE, c7 BINARY(20), c8 SMALLINT, "
|
||||
"c9 SMALLINT UNSIGNED COMMENT 'test column comment', c10 TINYINT, c11 TINYINT UNSIGNED, c12 BOOL, c13 NCHAR(30), c14 JSON, c15 VARCHAR(50)) "
|
||||
"TAGS (tsa TIMESTAMP, a1 INT, a2 INT UNSIGNED, a3 BIGINT, a4 BIGINT UNSIGNED, a5 FLOAT, a6 DOUBLE, a7 BINARY(20), a8 SMALLINT, "
|
||||
"a9 SMALLINT UNSIGNED COMMENT 'test column comment', a10 TINYINT, a11 TINYINT UNSIGNED, a12 BOOL, a13 NCHAR(30), a14 JSON, a15 VARCHAR(50)) "
|
||||
"KEEP 100 TTL 100 COMMENT 'test create table' SMA(c1, c2, c3)"
|
||||
);
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
|
@ -349,11 +349,15 @@ static SLogicNode* createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
|
|||
return pRoot;
|
||||
}
|
||||
|
||||
static int32_t getMsgType(ENodeType sqlType) {
|
||||
return (QUERY_NODE_CREATE_TABLE_STMT == sqlType || QUERY_NODE_CREATE_MULTI_TABLE_STMT == sqlType) ? TDMT_VND_CREATE_TABLE : TDMT_VND_SUBMIT;
|
||||
}
|
||||
|
||||
static SLogicNode* createVnodeModifLogicNode(SLogicPlanContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_VNODE_MODIF);
|
||||
CHECK_ALLOC(pModif, NULL);
|
||||
pModif->pDataBlocks = pStmt->pDataBlocks;
|
||||
pModif->msgType = (QUERY_NODE_CREATE_TABLE_STMT == pStmt->sqlNodeType ? TDMT_VND_CREATE_TABLE : TDMT_VND_SUBMIT);
|
||||
pModif->msgType = getMsgType(pStmt->sqlNodeType);
|
||||
return (SLogicNode*)pModif;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue