fix: error in schemaless
This commit is contained in:
parent
947f39523e
commit
3f398de926
|
@ -85,8 +85,11 @@ typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
|
|||
|
||||
typedef enum {
|
||||
SCHEMA_ACTION_NULL,
|
||||
SCHEMA_ACTION_COLUMN,
|
||||
SCHEMA_ACTION_TAG
|
||||
SCHEMA_ACTION_CREATE_STABLE,
|
||||
SCHEMA_ACTION_ADD_COLUMN,
|
||||
SCHEMA_ACTION_ADD_TAG,
|
||||
SCHEMA_ACTION_CHANGE_COLUMN_SIZE,
|
||||
SCHEMA_ACTION_CHANGE_TAG_SIZE,
|
||||
} ESchemaAction;
|
||||
|
||||
typedef struct {
|
||||
|
@ -219,7 +222,7 @@ static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const
|
|||
|
||||
static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSmlKv *kv, bool isTag,
|
||||
ESchemaAction *action, SSmlHandle *info) {
|
||||
uint16_t *index = (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen);
|
||||
uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
|
||||
if (index) {
|
||||
if (colField[*index].type != kv->type) {
|
||||
uError("SML:0x%" PRIx64 " point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id,
|
||||
|
@ -232,16 +235,16 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
|
|||
(colField[*index].type == TSDB_DATA_TYPE_NCHAR &&
|
||||
((colField[*index].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE < kv->length))) {
|
||||
if (isTag) {
|
||||
*action = SCHEMA_ACTION_TAG;
|
||||
*action = SCHEMA_ACTION_CHANGE_TAG_SIZE;
|
||||
} else {
|
||||
*action = SCHEMA_ACTION_COLUMN;
|
||||
*action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (isTag) {
|
||||
*action = SCHEMA_ACTION_TAG;
|
||||
*action = SCHEMA_ACTION_ADD_TAG;
|
||||
} else {
|
||||
*action = SCHEMA_ACTION_COLUMN;
|
||||
*action = SCHEMA_ACTION_ADD_COLUMN;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
@ -310,9 +313,31 @@ static int32_t getBytes(uint8_t type, int32_t length){
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray* results, int32_t numOfCols, bool isTag) {
|
||||
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
|
||||
ESchemaAction action = SCHEMA_ACTION_NULL;
|
||||
smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, &action, info);
|
||||
if(action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_ADD_TAG){
|
||||
SField field = {0};
|
||||
field.type = kv->type;
|
||||
field.bytes = getBytes(kv->type, kv->length);
|
||||
memcpy(field.name, kv->key, kv->keyLen);
|
||||
taosArrayPush(results, &field);
|
||||
}else if(action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){
|
||||
uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
|
||||
uint16_t newIndex = *index;
|
||||
if(isTag) newIndex -= numOfCols;
|
||||
SField *field = (SField *)taosArrayGet(results, newIndex);
|
||||
field->bytes = getBytes(kv->type, kv->length);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
|
||||
// int32_t colVer, int32_t tagVer, int8_t source, uint64_t suid){
|
||||
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *sTableData,
|
||||
static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray* pColumns, SArray* pTags,
|
||||
STableMeta *pTableMeta, ESchemaAction action){
|
||||
|
||||
SRequestObj* pRequest = NULL;
|
||||
|
@ -320,6 +345,12 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *s
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SCmdMsgInfo pCmdMsg = {0};
|
||||
|
||||
// put front for free
|
||||
pReq.numOfColumns = taosArrayGetSize(pColumns);
|
||||
pReq.pColumns = pColumns;
|
||||
pReq.numOfTags = taosArrayGetSize(pTags);
|
||||
pReq.pTags = pTags;
|
||||
|
||||
code = buildRequest(info->taos->id, "", 0, NULL, false, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
|
@ -330,91 +361,41 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SSmlSTableMeta *s
|
|||
goto end;
|
||||
}
|
||||
|
||||
if (action == SCHEMA_ACTION_NULL){
|
||||
if (action == SCHEMA_ACTION_CREATE_STABLE){
|
||||
pReq.colVer = 1;
|
||||
pReq.tagVer = 1;
|
||||
pReq.suid = 0;
|
||||
pReq.source = TD_REQ_FROM_APP;
|
||||
} else if (action == SCHEMA_ACTION_TAG){
|
||||
} else if (action == SCHEMA_ACTION_ADD_TAG || action == SCHEMA_ACTION_CHANGE_TAG_SIZE){
|
||||
pReq.colVer = pTableMeta->sversion;
|
||||
pReq.tagVer = pTableMeta->tversion + 1;
|
||||
pReq.suid = pTableMeta->uid;
|
||||
pReq.source = TD_REQ_FROM_TAOX;
|
||||
} else if (action == SCHEMA_ACTION_COLUMN){
|
||||
} else if (action == SCHEMA_ACTION_ADD_COLUMN || action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE){
|
||||
pReq.colVer = pTableMeta->sversion + 1;
|
||||
pReq.tagVer = pTableMeta->tversion;
|
||||
pReq.suid = pTableMeta->uid;
|
||||
pReq.source = TD_REQ_FROM_TAOX;
|
||||
}
|
||||
|
||||
if (pReq.numOfTags == 0){
|
||||
pReq.numOfTags = 1;
|
||||
SField field = {0};
|
||||
field.type = TSDB_DATA_TYPE_NCHAR;
|
||||
field.bytes = 1;
|
||||
strcpy(field.name, tsSmlTagName);
|
||||
taosArrayPush(pReq.pTags, &field);
|
||||
}
|
||||
|
||||
pReq.commentLen = -1;
|
||||
pReq.igExists = true;
|
||||
tNameExtractFullName(pName, pReq.name);
|
||||
|
||||
if(action == SCHEMA_ACTION_NULL || action == SCHEMA_ACTION_COLUMN){
|
||||
pReq.numOfColumns = taosArrayGetSize(sTableData->cols);
|
||||
pReq.pColumns = taosArrayInit(pReq.numOfColumns, sizeof(SField));
|
||||
for (int i = 0; i < pReq.numOfColumns; i++) {
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(sTableData->cols, i);
|
||||
SField field = {0};
|
||||
field.type = kv->type;
|
||||
field.bytes = getBytes(kv->type, kv->length);
|
||||
memcpy(field.name, kv->key, kv->keyLen);
|
||||
taosArrayPush(pReq.pColumns, &field);
|
||||
}
|
||||
}else if (action == SCHEMA_ACTION_TAG){
|
||||
pReq.numOfColumns = pTableMeta->tableInfo.numOfColumns;
|
||||
pReq.pColumns = taosArrayInit(pReq.numOfColumns, sizeof(SField));
|
||||
for (int i = 0; i < pReq.numOfColumns; i++) {
|
||||
SSchema *s = &pTableMeta->schema[i];
|
||||
SField field = {0};
|
||||
field.type = s->type;
|
||||
field.bytes = s->bytes;
|
||||
strcpy(field.name, s->name);
|
||||
taosArrayPush(pReq.pColumns, &field);
|
||||
}
|
||||
}
|
||||
|
||||
if(action == SCHEMA_ACTION_NULL || action == SCHEMA_ACTION_TAG){
|
||||
pReq.numOfTags = taosArrayGetSize(sTableData->tags);
|
||||
if (pReq.numOfTags == 0){
|
||||
pReq.numOfTags = 1;
|
||||
pReq.pTags = taosArrayInit(pReq.numOfTags, sizeof(SField));
|
||||
SField field = {0};
|
||||
field.type = TSDB_DATA_TYPE_NCHAR;
|
||||
field.bytes = 1;
|
||||
strcpy(field.name, tsSmlTagName);
|
||||
taosArrayPush(pReq.pTags, &field);
|
||||
}else{
|
||||
pReq.pTags = taosArrayInit(pReq.numOfTags, sizeof(SField));
|
||||
for (int i = 0; i < pReq.numOfTags; i++) {
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(sTableData->tags, i);
|
||||
SField field = {0};
|
||||
field.type = kv->type;
|
||||
field.bytes = getBytes(kv->type, kv->length);
|
||||
memcpy(field.name, kv->key, kv->keyLen);
|
||||
taosArrayPush(pReq.pTags, &field);
|
||||
}
|
||||
}
|
||||
}else if (action == SCHEMA_ACTION_COLUMN){
|
||||
pReq.numOfTags = pTableMeta->tableInfo.numOfTags;
|
||||
pReq.pTags = taosArrayInit(pReq.numOfTags, sizeof(SField));
|
||||
for (int i = 0; i < pReq.numOfTags; i++) {
|
||||
SSchema *s = &pTableMeta->schema[i + pTableMeta->tableInfo.numOfColumns];
|
||||
SField field = {0};
|
||||
field.type = s->type;
|
||||
field.bytes = s->bytes;
|
||||
strcpy(field.name, s->name);
|
||||
taosArrayPush(pReq.pTags, &field);
|
||||
}
|
||||
}
|
||||
|
||||
pCmdMsg.epSet = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
|
||||
pCmdMsg.msgType = TDMT_MND_CREATE_STB;
|
||||
pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
|
||||
pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
|
||||
if (NULL == pCmdMsg.pMsg) {
|
||||
tFreeSMCreateStbReq(&pReq);
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
|
@ -442,7 +423,10 @@ end:
|
|||
}
|
||||
|
||||
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||
int32_t code = 0;
|
||||
int32_t code = 0;
|
||||
SHashObj *hashTmp = NULL;
|
||||
STableMeta *pTableMeta = NULL;
|
||||
|
||||
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
|
||||
strcpy(pName.dbname, info->pRequest->pDb);
|
||||
|
||||
|
@ -455,7 +439,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
SSmlSTableMeta **tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, NULL);
|
||||
while (tableMetaSml) {
|
||||
SSmlSTableMeta *sTableData = *tableMetaSml;
|
||||
STableMeta *pTableMeta = NULL;
|
||||
bool needCheckMeta = false; // for multi thread
|
||||
|
||||
size_t superTableLen = 0;
|
||||
|
@ -466,14 +449,19 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||
code = smlSendMetaMsg(info, &pName, sTableData, NULL, SCHEMA_ACTION_NULL);
|
||||
SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
|
||||
SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
|
||||
smlBuildFieldsList(info, NULL, NULL, sTableData->tags, pTags, 0, true);
|
||||
smlBuildFieldsList(info, NULL, NULL, sTableData->cols, pColumns, 0, false);
|
||||
|
||||
code = smlSendMetaMsg(info, &pName, pColumns, pTags, NULL, SCHEMA_ACTION_CREATE_STABLE);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, superTable);
|
||||
goto end;
|
||||
}
|
||||
info->cost.numOfCreateSTables++;
|
||||
} else if (code == TSDB_CODE_SUCCESS) {
|
||||
SHashObj *hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags,
|
||||
hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags,
|
||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
for (uint16_t i = pTableMeta->tableInfo.numOfColumns;
|
||||
i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
|
||||
|
@ -483,34 +471,70 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
ESchemaAction action = SCHEMA_ACTION_NULL;
|
||||
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->tags, &action, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosHashCleanup(hashTmp);
|
||||
goto end;
|
||||
}
|
||||
if (action == SCHEMA_ACTION_TAG){
|
||||
code = smlSendMetaMsg(info, &pName, sTableData, pTableMeta, action);
|
||||
if (action != SCHEMA_ACTION_NULL){
|
||||
SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
||||
SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
|
||||
|
||||
for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
|
||||
SField field = {0};
|
||||
field.type = pTableMeta->schema[i].type;
|
||||
field.bytes = pTableMeta->schema[i].bytes;
|
||||
strcpy(field.name, pTableMeta->schema[i].name);
|
||||
if(i < pTableMeta->tableInfo.numOfColumns){
|
||||
taosArrayPush(pColumns, &field);
|
||||
}else{
|
||||
taosArrayPush(pTags, &field);
|
||||
}
|
||||
}
|
||||
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->tags, pTags, pTableMeta->tableInfo.numOfColumns, true);
|
||||
|
||||
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, superTable);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
taosHashClear(hashTmp);
|
||||
for (uint16_t i = 1; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||
for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
|
||||
}
|
||||
action = SCHEMA_ACTION_NULL;
|
||||
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &action, false);
|
||||
taosHashCleanup(hashTmp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
if (action == SCHEMA_ACTION_COLUMN){
|
||||
code = smlSendMetaMsg(info, &pName, sTableData, pTableMeta, action);
|
||||
if (action != SCHEMA_ACTION_NULL){
|
||||
SArray* pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + pTableMeta->tableInfo.numOfColumns, sizeof(SField));
|
||||
SArray* pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + pTableMeta->tableInfo.numOfTags, sizeof(SField));
|
||||
|
||||
for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; i++) {
|
||||
SField field = {0};
|
||||
field.type = pTableMeta->schema[i].type;
|
||||
field.bytes = pTableMeta->schema[i].bytes;
|
||||
strcpy(field.name, pTableMeta->schema[i].name);
|
||||
if(i < pTableMeta->tableInfo.numOfColumns){
|
||||
taosArrayPush(pColumns, &field);
|
||||
}else{
|
||||
taosArrayPush(pTags, &field);
|
||||
}
|
||||
}
|
||||
|
||||
smlBuildFieldsList(info, pTableMeta->schema, hashTmp, sTableData->cols, pColumns, pTableMeta->tableInfo.numOfColumns, false);
|
||||
|
||||
code = smlSendMetaMsg(info, &pName, pColumns, pTags, pTableMeta, action);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, superTable);
|
||||
goto end;
|
||||
|
@ -526,7 +550,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
|
||||
goto end;
|
||||
}
|
||||
if (pTableMeta) taosMemoryFree(pTableMeta);
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -551,10 +575,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
sTableData->tableMeta = pTableMeta;
|
||||
|
||||
tableMetaSml = (SSmlSTableMeta **)taosHashIterate(info->superTables, tableMetaSml);
|
||||
taosHashCleanup(hashTmp);
|
||||
}
|
||||
return 0;
|
||||
|
||||
end:
|
||||
taosHashCleanup(hashTmp);
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -85,6 +85,9 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("select * from macylr")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
tdSql.query("desc macylr")
|
||||
tdSql.checkRows(25)
|
||||
return
|
||||
|
||||
def run(self):
|
||||
|
|
|
@ -1089,7 +1089,7 @@ int sml_add_tag_col_Test() {
|
|||
if (code) return code;
|
||||
|
||||
const char *sql1[] = {
|
||||
"macylr,id=macylr_17875_1804,t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\",t11=127i8,t10=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64,c11=L\"ncharColValue\",c10=f 1626006833639000000"
|
||||
"macylr,id=macylr_17875_1804,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\",t11=127i8,t10=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c8=L\"ncharColValue\",c9=7u64,c11=L\"ncharColValue\",c10=f 1626006833639000000"
|
||||
};
|
||||
|
||||
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, 0);
|
||||
|
|
Loading…
Reference in New Issue