support specify tag names in create table sql

This commit is contained in:
dapan1121 2021-02-18 18:24:32 +08:00
parent b378b691bd
commit 8d75f2481a
6 changed files with 1295 additions and 883 deletions

View File

@ -6375,16 +6375,14 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
// get table meta from mnode // get table meta from mnode
code = tNameExtractFullName(&pStableMetaInfo->name, pCreateTableInfo->tagdata.name); code = tNameExtractFullName(&pStableMetaInfo->name, pCreateTableInfo->tagdata.name);
SArray* pList = pCreateTableInfo->pTagVals; SArray* pValList = pCreateTableInfo->pTagVals;
code = tscGetTableMeta(pSql, pStableMetaInfo); code = tscGetTableMeta(pSql, pStableMetaInfo);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
size_t size = taosArrayGetSize(pList); size_t valSize = taosArrayGetSize(pValList);
if (tscGetNumOfTags(pStableMetaInfo->pTableMeta) != size) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
// too long tag values will return invalid sql, not be truncated automatically // too long tag values will return invalid sql, not be truncated automatically
SSchema *pTagSchema = tscGetTableTagSchema(pStableMetaInfo->pTableMeta); SSchema *pTagSchema = tscGetTableTagSchema(pStableMetaInfo->pTableMeta);
@ -6395,36 +6393,107 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SArray* pNameList = NULL;
size_t nameSize = 0;
int32_t schemaSize = tscGetNumOfTags(pStableMetaInfo->pTableMeta);
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < size; ++i) {
SSchema* pSchema = &pTagSchema[i];
tVariantListItem* pItem = taosArrayGet(pList, i);
char tagVal[TSDB_MAX_TAGS_LEN]; if (pCreateTableInfo->pTagNames) {
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) { pNameList = pCreateTableInfo->pTagNames;
if (pItem->pVar.nLen > pSchema->bytes) { nameSize = taosArrayGetSize(pNameList);
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); if (valSize != nameSize) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
if (schemaSize < valSize) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
bool findColumnIndex = false;
for (int32_t i = 0; i < nameSize; ++i) {
SStrToken* sToken = taosArrayGet(pNameList, i);
tVariantListItem* pItem = taosArrayGet(pValList, i);
findColumnIndex = false;
// todo speedup by using hash list
for (int32_t t = 0; t < schemaSize; ++t) {
if (strncmp(sToken->z, pTagSchema[t].name, sToken->n) == 0 && strlen(pTagSchema[t].name) == sToken->n) {
SSchema* pSchema = &pTagSchema[t];
char tagVal[TSDB_MAX_TAGS_LEN];
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
if (pItem->pVar.nLen > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
ret = tVariantDump(&(pItem->pVar), tagVal, pSchema->type, true);
// check again after the convert since it may be converted from binary to nchar.
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
int16_t len = varDataTLen(tagVal);
if (len > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
if (ret != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
findColumnIndex = true;
break;
}
} }
if (!findColumnIndex) {
return tscInvalidSQLErrMsg(pCmd->payload, "invalid tag name", sToken->z);
}
}
} else {
if (schemaSize != valSize) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5);
} }
ret = tVariantDump(&(pItem->pVar), tagVal, pSchema->type, true); for (int32_t i = 0; i < valSize; ++i) {
SSchema* pSchema = &pTagSchema[i];
// check again after the convert since it may be converted from binary to nchar. tVariantListItem* pItem = taosArrayGet(pValList, i);
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
int16_t len = varDataTLen(tagVal); char tagVal[TSDB_MAX_TAGS_LEN];
if (len > pSchema->bytes) { if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
tdDestroyKVRowBuilder(&kvRowBuilder); if (pItem->pVar.nLen > pSchema->bytes) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
} }
ret = tVariantDump(&(pItem->pVar), tagVal, pSchema->type, true);
// check again after the convert since it may be converted from binary to nchar.
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
int16_t len = varDataTLen(tagVal);
if (len > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
if (ret != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
} }
if (ret != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
} }
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);

View File

@ -122,8 +122,8 @@
#define TK_UNSIGNED 103 #define TK_UNSIGNED 103
#define TK_TAGS 104 #define TK_TAGS 104
#define TK_USING 105 #define TK_USING 105
#define TK_AS 106 #define TK_COMMA 106
#define TK_COMMA 107 #define TK_AS 107
#define TK_NULL 108 #define TK_NULL 108
#define TK_SELECT 109 #define TK_SELECT 109
#define TK_UNION 110 #define TK_UNION 110
@ -228,6 +228,7 @@
#define TK_VALUES 209 #define TK_VALUES 209
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
#define TK_ILLEGAL 302 #define TK_ILLEGAL 302

View File

@ -76,6 +76,7 @@ typedef struct SQuerySQL {
typedef struct SCreatedTableInfo { typedef struct SCreatedTableInfo {
SStrToken name; // table name token SStrToken name; // table name token
SStrToken stableName; // super table name token , for using clause SStrToken stableName; // super table name token , for using clause
SArray *pTagNames; // create by using super table, tag name
SArray *pTagVals; // create by using super table, tag value SArray *pTagVals; // create by using super table, tag value
char *fullname; // table full name char *fullname; // table full name
STagData tagdata; // true tag data, super table full name is in STagData STagData tagdata; // true tag data, super table full name is in STagData
@ -246,7 +247,7 @@ SCreateTableSQL *tSetCreateSqlElems(SArray *pCols, SArray *pTags, SQuerySQL *pSe
void tSqlExprNodeDestroy(tSQLExpr *pExpr); void tSqlExprNodeDestroy(tSQLExpr *pExpr);
SAlterTableInfo * tAlterTableSqlElems(SStrToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableTable); SAlterTableInfo * tAlterTableSqlElems(SStrToken *pTableName, SArray *pCols, SArray *pVals, int32_t type, int16_t tableTable);
SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagVals, SStrToken *pToken, SStrToken* igExists); SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagNames, SArray *pTagVals, SStrToken *pToken, SStrToken* igExists);
void destroyAllSelectClause(SSubclauseInfo *pSql); void destroyAllSelectClause(SSubclauseInfo *pSql);
void doDestroyQuerySql(SQuerySQL *pSql); void doDestroyQuerySql(SQuerySQL *pSql);

View File

@ -356,9 +356,20 @@ create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP T
create_from_stable(A) ::= ifnotexists(U) ids(V) cpxName(Z) USING ids(X) cpxName(F) TAGS LP tagitemlist(Y) RP. { create_from_stable(A) ::= ifnotexists(U) ids(V) cpxName(Z) USING ids(X) cpxName(F) TAGS LP tagitemlist(Y) RP. {
X.n += F.n; X.n += F.n;
V.n += Z.n; V.n += Z.n;
A = createNewChildTableInfo(&X, Y, &V, &U); A = createNewChildTableInfo(&X, NULL, Y, &V, &U);
} }
create_from_stable(A) ::= ifnotexists(U) ids(V) cpxName(Z) USING ids(X) cpxName(F) LP tagNamelist(P) RP TAGS LP tagitemlist(Y) RP. {
X.n += F.n;
V.n += Z.n;
A = createNewChildTableInfo(&X, P, Y, &V, &U);
}
%type tagNamelist{SArray*}
%destructor tagNamelist {taosArrayDestroy($$);}
tagNamelist(A) ::= tagNamelist(X) COMMA ids(Y). {taosArrayPush(X, &Y); A = X; }
tagNamelist(A) ::= ids(X). {A = taosArrayInit(4, sizeof(SStrToken)); taosArrayPush(A, &X);}
// create stream // create stream
// create table table_name as select count(*) from super_table_name interval(time) // create table table_name as select count(*) from super_table_name interval(time)
create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). { create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) AS select(S). {

View File

@ -496,7 +496,8 @@ static void freeVariant(void *pItem) {
} }
void freeCreateTableInfo(void* p) { void freeCreateTableInfo(void* p) {
SCreatedTableInfo* pInfo = (SCreatedTableInfo*) p; SCreatedTableInfo* pInfo = (SCreatedTableInfo*) p;
taosArrayDestroy(pInfo->pTagNames);
taosArrayDestroyEx(pInfo->pTagVals, freeVariant); taosArrayDestroyEx(pInfo->pTagVals, freeVariant);
tfree(pInfo->fullname); tfree(pInfo->fullname);
tfree(pInfo->tagdata.data); tfree(pInfo->tagdata.data);
@ -574,11 +575,12 @@ SCreateTableSQL *tSetCreateSqlElems(SArray *pCols, SArray *pTags, SQuerySQL *pSe
return pCreate; return pCreate;
} }
SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagVals, SStrToken *pToken, SStrToken* igExists) { SCreatedTableInfo createNewChildTableInfo(SStrToken *pTableName, SArray *pTagNames, SArray *pTagVals, SStrToken *pToken, SStrToken* igExists) {
SCreatedTableInfo info; SCreatedTableInfo info;
memset(&info, 0, sizeof(SCreatedTableInfo)); memset(&info, 0, sizeof(SCreatedTableInfo));
info.name = *pToken; info.name = *pToken;
info.pTagNames = pTagNames;
info.pTagVals = pTagVals; info.pTagVals = pTagVals;
info.stableName = *pTableName; info.stableName = *pTableName;
info.igExist = (igExists->n > 0)? 1:0; info.igExist = (igExists->n > 0)? 1:0;

File diff suppressed because it is too large Load Diff