From 34c87dc428f9d1d3551cab8fb243cb0c30f0c2d5 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 18 Jul 2022 15:33:42 +0800 Subject: [PATCH] fix: ts already exist when insert with schemaless & modify the interface of tmq meta --- include/client/taos.h | 12 ++++++--- source/client/src/clientSml.c | 25 ++++++++++++----- source/client/src/tmq.c | 43 +++++++++++++----------------- source/libs/parser/src/parInsert.c | 4 ++- 4 files changed, 47 insertions(+), 37 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 690c473986..8afefe4e6b 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -259,13 +259,17 @@ enum tmq_res_t { TMQ_RES_TABLE_META = 2, }; +typedef struct { + void* raw_meta; + uint32_t raw_meta_len; + uint16_t raw_meta_type; +} tmq_raw_data; + typedef enum tmq_res_t tmq_res_t; -typedef struct tmq_raw_data tmq_raw_data; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); -DLL_EXPORT tmq_raw_data *tmq_get_raw_meta(TAOS_RES *res); -DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta); -DLL_EXPORT void tmq_free_raw_meta(tmq_raw_data *rawMeta); +DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta); +DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta); DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT void tmq_free_json_meta(char* jsonMeta); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 83f2cf9307..ab3cacfb49 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -268,7 +268,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm *actionNeeded = true; } if (*actionNeeded) { - uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, colField->name, + uDebug("SML:0x%" PRIx64 " generate schema action. kv->name: %s, action: %d", info->id, kv->key, action->action); } return 0; @@ -436,6 +436,7 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH SSchemaAction *action, bool isTag) { int32_t code = TSDB_CODE_SUCCESS; for (int j = 0; j < taosArrayGetSize(cols); ++j) { + if(j == 0 && !isTag) continue; SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j); bool actionNeeded = false; code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, &actionNeeded, info); @@ -452,18 +453,25 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH return TSDB_CODE_SUCCESS; } -static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols) { +static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) { SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - for (uint16_t i = 0; i < length; i++) { + int32_t i = 0; + for ( ;i < length; i++) { taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES); } - for (int32_t i = 0; i < taosArrayGetSize(cols); i++) { + if (isTag){ + i = 0; + } else { + i = 1; + } + for (; i < taosArrayGetSize(cols); i++) { SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) { return -1; } } + taosHashCleanup(hashTmp); return 0; } @@ -523,7 +531,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { } taosHashClear(hashTmp); - for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { + for (uint16_t i = 1; i < pTableMeta->tableInfo.numOfColumns; i++) { taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES); } code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &schemaAction, false); @@ -551,12 +559,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { if (needCheckMeta) { code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, - sTableData->tags); + sTableData->tags, true); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, (char *)superTable); goto end; } - code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols); + code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, (char *)superTable); goto end; @@ -832,6 +840,7 @@ static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) { int64_t ts = 0; if (info->protocol == TSDB_SML_LINE_PROTOCOL) { +// uError("SML:data:%s,len:%d", data, len); ts = smlParseInfluxTime(info, data, len); } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { ts = smlParseOpenTsdbTime(info, data, len); @@ -2031,6 +2040,8 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) { SSmlLineInfo elements = {0}; + uError("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql); + int ret = smlParseInfluxString(sql, &elements, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 6ad6413159..46b21086ce 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -106,12 +106,6 @@ struct tmq_t { tsem_t rspSem; }; -struct tmq_raw_data { - void* raw_meta; - int32_t raw_meta_len; - int16_t raw_meta_type; -}; - enum { TMQ_VG_STATUS__IDLE = 0, TMQ_VG_STATUS__WAIT, @@ -1858,16 +1852,15 @@ const char* tmq_get_table_name(TAOS_RES* res) { return NULL; } -tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) { - if (TD_RES_TMQ_META(res)) { - tmq_raw_data *raw = taosMemoryCalloc(1, sizeof(tmq_raw_data)); +int32_t tmq_get_raw_meta(TAOS_RES* res, tmq_raw_data *raw) { + if (TD_RES_TMQ_META(res) && raw) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; raw->raw_meta = pMetaRspObj->metaRsp.metaRsp; raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen; raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType; - return raw; + return TSDB_CODE_SUCCESS; } - return NULL; + return TSDB_CODE_INVALID_PARA; } static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){ @@ -2875,23 +2868,23 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){ return code; } -int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){ - if (!taos || !raw_meta) { +int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){ + if (!taos) { return TSDB_CODE_INVALID_PARA; } - if(raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) { - return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); - }else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_STB){ - return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); - }else if(raw_meta->raw_meta_type == TDMT_VND_DROP_STB){ - return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); - }else if(raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE){ - return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); - }else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE){ - return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); - }else if(raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE){ - return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); + if(raw_meta.raw_meta_type == TDMT_VND_CREATE_STB) { + return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); + }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_STB){ + return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); + }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_STB){ + return taosDropStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); + }else if(raw_meta.raw_meta_type == TDMT_VND_CREATE_TABLE){ + return taosCreateTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); + }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_TABLE){ + return taosAlterTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); + }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_TABLE){ + return taosDropTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len); } return TSDB_CODE_INVALID_PARA; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 5ac4476d14..623b26204d 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -2177,7 +2177,7 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS SSmlKv* kv = taosArrayGetP(cols, i); SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; col_id_t t = lastColIdx + 1; - col_id_t index = findCol(&sToken, t, nCols, pSchema); + col_id_t index = (t == 0 ? 0 : findCol(&sToken, t, nCols, pSchema)); if (index < 0 && t > 0) { index = findCol(&sToken, 0, t, pSchema); isOrdered = false; @@ -2401,7 +2401,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols } else { int32_t colLen = kv->length; if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { +// uError("SML:data before:%ld, precision:%d", kv->i, pTableMeta->tableInfo.precision); kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); +// uError("SML:data after:%ld, precision:%d", kv->i, pTableMeta->tableInfo.precision); } if (IS_VAR_DATA_TYPE(kv->type)) {