fix:<TD-17324> ts already exist when insert with schemaless & modify the interface of tmq meta

This commit is contained in:
wangmm0220 2022-07-18 15:33:42 +08:00
parent 22903b3dff
commit 34c87dc428
4 changed files with 47 additions and 37 deletions

View File

@ -259,13 +259,17 @@ enum tmq_res_t {
TMQ_RES_TABLE_META = 2, TMQ_RES_TABLE_META = 2,
}; };
typedef struct {
void* raw_meta;
uint32_t raw_meta_len;
uint16_t raw_meta_type;
} tmq_raw_data;
typedef enum tmq_res_t tmq_res_t; typedef enum tmq_res_t tmq_res_t;
typedef struct tmq_raw_data tmq_raw_data;
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT tmq_raw_data *tmq_get_raw_meta(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta);
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta); DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta);
DLL_EXPORT void tmq_free_raw_meta(tmq_raw_data *rawMeta);
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT void tmq_free_json_meta(char* jsonMeta); DLL_EXPORT void tmq_free_json_meta(char* jsonMeta);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);

View File

@ -268,7 +268,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
*actionNeeded = true; *actionNeeded = true;
} }
if (*actionNeeded) { if (*actionNeeded) {
uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, colField->name, uDebug("SML:0x%" PRIx64 " generate schema action. kv->name: %s, action: %d", info->id, kv->key,
action->action); action->action);
} }
return 0; return 0;
@ -436,6 +436,7 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
SSchemaAction *action, bool isTag) { SSchemaAction *action, bool isTag) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
for (int j = 0; j < taosArrayGetSize(cols); ++j) { for (int j = 0; j < taosArrayGetSize(cols); ++j) {
if(j == 0 && !isTag) continue;
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j); SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
bool actionNeeded = false; bool actionNeeded = false;
code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, &actionNeeded, info); code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, &actionNeeded, info);
@ -452,18 +453,25 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols) { static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
for (uint16_t i = 0; i < length; i++) { int32_t i = 0;
for ( ;i < length; i++) {
taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES); taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
} }
for (int32_t i = 0; i < taosArrayGetSize(cols); i++) { if (isTag){
i = 0;
} else {
i = 1;
}
for (; i < taosArrayGetSize(cols); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i); SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) { if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
return -1; return -1;
} }
} }
taosHashCleanup(hashTmp);
return 0; return 0;
} }
@ -523,7 +531,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
} }
taosHashClear(hashTmp); taosHashClear(hashTmp);
for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) { for (uint16_t i = 1; i < pTableMeta->tableInfo.numOfColumns; i++) {
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES); taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
} }
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &schemaAction, false); code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &schemaAction, false);
@ -551,12 +559,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
if (needCheckMeta) { if (needCheckMeta) {
code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
sTableData->tags); sTableData->tags, true);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, (char *)superTable); uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, (char *)superTable);
goto end; goto end;
} }
code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols); code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, (char *)superTable); uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, (char *)superTable);
goto end; goto end;
@ -832,6 +840,7 @@ static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) { static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
int64_t ts = 0; int64_t ts = 0;
if (info->protocol == TSDB_SML_LINE_PROTOCOL) { if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
// uError("SML:data:%s,len:%d", data, len);
ts = smlParseInfluxTime(info, data, len); ts = smlParseInfluxTime(info, data, len);
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) { } else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
ts = smlParseOpenTsdbTime(info, data, len); ts = smlParseOpenTsdbTime(info, data, len);
@ -2031,6 +2040,8 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) { static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
SSmlLineInfo elements = {0}; SSmlLineInfo elements = {0};
uError("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql);
int ret = smlParseInfluxString(sql, &elements, &info->msgBuf); int ret = smlParseInfluxString(sql, &elements, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id); uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);

View File

@ -106,12 +106,6 @@ struct tmq_t {
tsem_t rspSem; tsem_t rspSem;
}; };
struct tmq_raw_data {
void* raw_meta;
int32_t raw_meta_len;
int16_t raw_meta_type;
};
enum { enum {
TMQ_VG_STATUS__IDLE = 0, TMQ_VG_STATUS__IDLE = 0,
TMQ_VG_STATUS__WAIT, TMQ_VG_STATUS__WAIT,
@ -1858,16 +1852,15 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL; return NULL;
} }
tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) { int32_t tmq_get_raw_meta(TAOS_RES* res, tmq_raw_data *raw) {
if (TD_RES_TMQ_META(res)) { if (TD_RES_TMQ_META(res) && raw) {
tmq_raw_data *raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
raw->raw_meta = pMetaRspObj->metaRsp.metaRsp; raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen; raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType; raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
return raw; return TSDB_CODE_SUCCESS;
} }
return NULL; return TSDB_CODE_INVALID_PARA;
} }
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){ static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){
@ -2875,23 +2868,23 @@ static int32_t taosAlterTable(TAOS *taos, void *meta, int32_t metaLen){
return code; return code;
} }
int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta){ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){
if (!taos || !raw_meta) { if (!taos) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
if(raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) { if(raw_meta.raw_meta_type == TDMT_VND_CREATE_STB) {
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_STB){ }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_STB){
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_STB){ }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_STB){
return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosDropStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE){ }else if(raw_meta.raw_meta_type == TDMT_VND_CREATE_TABLE){
return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosCreateTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE){ }else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_TABLE){
return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosAlterTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
}else if(raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE){ }else if(raw_meta.raw_meta_type == TDMT_VND_DROP_TABLE){
return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len); return taosDropTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
} }
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }

View File

@ -2177,7 +2177,7 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
SSmlKv* kv = taosArrayGetP(cols, i); SSmlKv* kv = taosArrayGetP(cols, i);
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key}; SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
col_id_t t = lastColIdx + 1; col_id_t t = lastColIdx + 1;
col_id_t index = findCol(&sToken, t, nCols, pSchema); col_id_t index = (t == 0 ? 0 : findCol(&sToken, t, nCols, pSchema));
if (index < 0 && t > 0) { if (index < 0 && t > 0) {
index = findCol(&sToken, 0, t, pSchema); index = findCol(&sToken, 0, t, pSchema);
isOrdered = false; isOrdered = false;
@ -2401,7 +2401,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
} else { } else {
int32_t colLen = kv->length; int32_t colLen = kv->length;
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) { if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
// uError("SML:data before:%ld, precision:%d", kv->i, pTableMeta->tableInfo.precision);
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision); kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
// uError("SML:data after:%ld, precision:%d", kv->i, pTableMeta->tableInfo.precision);
} }
if (IS_VAR_DATA_TYPE(kv->type)) { if (IS_VAR_DATA_TYPE(kv->type)) {