Merge pull request #15077 from taosdata/feature/TD-14761
fix:<TD-17324> ts already exist when insert with schemaless & modify the interface of tmq meta
This commit is contained in:
commit
1c03d61edc
|
@ -28,8 +28,9 @@ static void msg_process(TAOS_RES* msg) {
|
|||
printf("db: %s\n", tmq_get_db_name(msg));
|
||||
printf("vg: %d\n", tmq_get_vgroup_id(msg));
|
||||
if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) {
|
||||
tmq_raw_data* raw = tmq_get_raw_meta(msg);
|
||||
if (raw) {
|
||||
tmq_raw_data raw = {0};
|
||||
int32_t code = tmq_get_raw_meta(msg, &raw);
|
||||
if (code == 0) {
|
||||
TAOS* pConn = taos_connect("192.168.1.86", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return;
|
||||
|
@ -53,7 +54,6 @@ static void msg_process(TAOS_RES* msg) {
|
|||
printf("write raw data: %s\n", tmq_err2str(ret));
|
||||
taos_close(pConn);
|
||||
}
|
||||
tmq_free_raw_meta(raw);
|
||||
char* result = tmq_get_json_meta(msg);
|
||||
if (result) {
|
||||
printf("meta result: %s\n", result);
|
||||
|
|
|
@ -259,13 +259,17 @@ enum tmq_res_t {
|
|||
TMQ_RES_TABLE_META = 2,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
void* raw_meta;
|
||||
uint32_t raw_meta_len;
|
||||
uint16_t raw_meta_type;
|
||||
} tmq_raw_data;
|
||||
|
||||
typedef enum tmq_res_t tmq_res_t;
|
||||
typedef struct tmq_raw_data tmq_raw_data;
|
||||
|
||||
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
||||
DLL_EXPORT tmq_raw_data *tmq_get_raw_meta(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data *raw_meta);
|
||||
DLL_EXPORT void tmq_free_raw_meta(tmq_raw_data *rawMeta);
|
||||
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta);
|
||||
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta);
|
||||
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta
|
||||
DLL_EXPORT void tmq_free_json_meta(char* jsonMeta);
|
||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
|
|
|
@ -268,7 +268,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
|
|||
*actionNeeded = true;
|
||||
}
|
||||
if (*actionNeeded) {
|
||||
uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, colField->name,
|
||||
uDebug("SML:0x%" PRIx64 " generate schema action. kv->name: %s, action: %d", info->id, kv->key,
|
||||
action->action);
|
||||
}
|
||||
return 0;
|
||||
|
@ -436,6 +436,7 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
|
|||
SSchemaAction *action, bool isTag) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
|
||||
if(j == 0 && !isTag) continue;
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, j);
|
||||
bool actionNeeded = false;
|
||||
code = smlGenerateSchemaAction(schemaField, schemaHash, kv, isTag, action, &actionNeeded, info);
|
||||
|
@ -452,18 +453,25 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols) {
|
||||
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
|
||||
SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
for (uint16_t i = 0; i < length; i++) {
|
||||
int32_t i = 0;
|
||||
for ( ;i < length; i++) {
|
||||
taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(cols); i++) {
|
||||
if (isTag){
|
||||
i = 0;
|
||||
} else {
|
||||
i = 1;
|
||||
}
|
||||
for (; i < taosArrayGetSize(cols); i++) {
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
|
||||
if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
taosHashCleanup(hashTmp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -523,7 +531,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
}
|
||||
|
||||
taosHashClear(hashTmp);
|
||||
for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||
for (uint16_t i = 1; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||
taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES);
|
||||
}
|
||||
code = smlProcessSchemaAction(info, pTableMeta->schema, hashTmp, sTableData->cols, &schemaAction, false);
|
||||
|
@ -551,12 +559,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
|
||||
if (needCheckMeta) {
|
||||
code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
|
||||
sTableData->tags);
|
||||
sTableData->tags, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " check tag failed. super table name %s", info->id, (char *)superTable);
|
||||
goto end;
|
||||
}
|
||||
code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols);
|
||||
code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " check cols failed. super table name %s", info->id, (char *)superTable);
|
||||
goto end;
|
||||
|
@ -832,6 +840,7 @@ static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t
|
|||
static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArray *cols) {
|
||||
int64_t ts = 0;
|
||||
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
||||
// uError("SML:data:%s,len:%d", data, len);
|
||||
ts = smlParseInfluxTime(info, data, len);
|
||||
} else if (info->protocol == TSDB_SML_TELNET_PROTOCOL) {
|
||||
ts = smlParseOpenTsdbTime(info, data, len);
|
||||
|
@ -2031,6 +2040,8 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
|
|||
|
||||
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
|
||||
SSmlLineInfo elements = {0};
|
||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql);
|
||||
|
||||
int ret = smlParseInfluxString(sql, &elements, &info->msgBuf);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " smlParseInfluxLine failed", info->id);
|
||||
|
|
|
@ -113,12 +113,6 @@ struct tmq_t {
|
|||
tsem_t rspSem;
|
||||
};
|
||||
|
||||
struct tmq_raw_data {
|
||||
void* raw_meta;
|
||||
int32_t raw_meta_len;
|
||||
int16_t raw_meta_type;
|
||||
};
|
||||
|
||||
enum {
|
||||
TMQ_VG_STATUS__IDLE = 0,
|
||||
TMQ_VG_STATUS__WAIT,
|
||||
|
@ -1918,16 +1912,15 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
tmq_raw_data* tmq_get_raw_meta(TAOS_RES* res) {
|
||||
if (TD_RES_TMQ_META(res)) {
|
||||
tmq_raw_data* raw = taosMemoryCalloc(1, sizeof(tmq_raw_data));
|
||||
int32_t tmq_get_raw_meta(TAOS_RES* res, tmq_raw_data *raw) {
|
||||
if (TD_RES_TMQ_META(res) && raw) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
raw->raw_meta = pMetaRspObj->metaRsp.metaRsp;
|
||||
raw->raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
|
||||
raw->raw_meta_type = pMetaRspObj->metaRsp.resMsgType;
|
||||
return raw;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return NULL;
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
|
||||
|
@ -2935,23 +2928,23 @@ end:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t taos_write_raw_meta(TAOS* taos, tmq_raw_data* raw_meta) {
|
||||
if (!taos || !raw_meta) {
|
||||
int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){
|
||||
if (!taos) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
if (raw_meta->raw_meta_type == TDMT_VND_CREATE_STB) {
|
||||
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
||||
} else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_STB) {
|
||||
return taosCreateStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
||||
} else if (raw_meta->raw_meta_type == TDMT_VND_DROP_STB) {
|
||||
return taosDropStb(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
||||
} else if (raw_meta->raw_meta_type == TDMT_VND_CREATE_TABLE) {
|
||||
return taosCreateTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
||||
} else if (raw_meta->raw_meta_type == TDMT_VND_ALTER_TABLE) {
|
||||
return taosAlterTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
||||
} else if (raw_meta->raw_meta_type == TDMT_VND_DROP_TABLE) {
|
||||
return taosDropTable(taos, raw_meta->raw_meta, raw_meta->raw_meta_len);
|
||||
if(raw_meta.raw_meta_type == TDMT_VND_CREATE_STB) {
|
||||
return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
|
||||
}else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_STB){
|
||||
return taosCreateStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
|
||||
}else if(raw_meta.raw_meta_type == TDMT_VND_DROP_STB){
|
||||
return taosDropStb(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
|
||||
}else if(raw_meta.raw_meta_type == TDMT_VND_CREATE_TABLE){
|
||||
return taosCreateTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
|
||||
}else if(raw_meta.raw_meta_type == TDMT_VND_ALTER_TABLE){
|
||||
return taosAlterTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
|
||||
}else if(raw_meta.raw_meta_type == TDMT_VND_DROP_TABLE){
|
||||
return taosDropTable(taos, raw_meta.raw_meta, raw_meta.raw_meta_len);
|
||||
}
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
|
|
@ -2161,7 +2161,7 @@ static void smlDestroyTableHandle(void* pHandle) {
|
|||
tdDestroySVCreateTbReq(&handle->createTblReq);
|
||||
}
|
||||
|
||||
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema) {
|
||||
static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SSchema* pSchema, bool isTag) {
|
||||
col_id_t nCols = pColList->numOfCols;
|
||||
|
||||
pColList->numOfBound = 0;
|
||||
|
@ -2177,7 +2177,8 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS
|
|||
SSmlKv* kv = taosArrayGetP(cols, i);
|
||||
SToken sToken = {.n = kv->keyLen, .z = (char*)kv->key};
|
||||
col_id_t t = lastColIdx + 1;
|
||||
col_id_t index = findCol(&sToken, t, nCols, pSchema);
|
||||
col_id_t index = ((t == 0 && !isTag) ? 0 : findCol(&sToken, t, nCols, pSchema));
|
||||
uDebug("SML, index:%d, t:%d, ncols:%d, kv->name:%s", index, t, nCols, kv->key);
|
||||
if (index < 0 && t > 0) {
|
||||
index = findCol(&sToken, 0, t, pSchema);
|
||||
isOrdered = false;
|
||||
|
@ -2312,7 +2313,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
|||
smlDestroyTableHandle(&smlHandle->tableExecHandle); // free for each table
|
||||
SSchema* pTagsSchema = getTableTagSchema(pTableMeta);
|
||||
setBoundColumnInfo(&smlHandle->tableExecHandle.tags, pTagsSchema, getNumOfTags(pTableMeta));
|
||||
int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema);
|
||||
int ret = smlBoundColumnData(tags, &smlHandle->tableExecHandle.tags, pTagsSchema, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
buildInvalidOperationMsg(&pBuf, "bound tags error");
|
||||
return ret;
|
||||
|
@ -2343,7 +2344,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
|||
|
||||
SSchema* pSchema = getTableColumnSchema(pTableMeta);
|
||||
|
||||
ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema);
|
||||
ret = smlBoundColumnData(colsSchema, &pDataBlock->boundColumnInfo, pSchema, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
buildInvalidOperationMsg(&pBuf, "bound cols error");
|
||||
return ret;
|
||||
|
@ -2401,7 +2402,9 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols
|
|||
} else {
|
||||
int32_t colLen = kv->length;
|
||||
if (pColSchema->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
// uError("SML:data before:%ld, precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
||||
kv->i = convertTimePrecision(kv->i, TSDB_TIME_PRECISION_NANO, pTableMeta->tableInfo.precision);
|
||||
// uError("SML:data after:%ld, precision:%d", kv->i, pTableMeta->tableInfo.precision);
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(kv->type)) {
|
||||
|
|
|
@ -45,10 +45,10 @@ typedef enum {
|
|||
NOTIFY_CMD_ID_BUTT,
|
||||
} NOTIFY_CMD_ID;
|
||||
|
||||
typedef enum enumQUERY_TYPE {
|
||||
NO_INSERT_TYPE,
|
||||
INSERT_TYPE,
|
||||
QUERY_TYPE_BUT
|
||||
typedef enum enumQUERY_TYPE {
|
||||
NO_INSERT_TYPE,
|
||||
INSERT_TYPE,
|
||||
QUERY_TYPE_BUT
|
||||
} QUERY_TYPE;
|
||||
|
||||
typedef struct {
|
||||
|
@ -587,9 +587,10 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
|||
tmq_get_topic_name(msg), vgroupId);
|
||||
|
||||
{
|
||||
tmq_raw_data *raw = tmq_get_raw_meta(msg);
|
||||
tmq_raw_data raw = {0};
|
||||
int32_t code = tmq_get_raw_meta(msg, &raw);
|
||||
|
||||
if(raw){
|
||||
if(code == TSDB_CODE_SUCCESS){
|
||||
TAOS_RES* pRes = taos_query(pInfo->taos, "use metadb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
pError("error when use metadb, reason:%s\n", taos_errstr(pRes));
|
||||
|
@ -599,10 +600,9 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
|||
exit(-1);
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taosFprintfFile(g_fp, "raw:%p\n", raw);
|
||||
taosFprintfFile(g_fp, "raw:%p\n", &raw);
|
||||
|
||||
int32_t ret = taos_write_raw_meta(pInfo->taos, raw);
|
||||
taosMemoryFree(raw);
|
||||
taos_write_raw_meta(pInfo->taos, raw);
|
||||
}
|
||||
|
||||
char* result = tmq_get_json_meta(msg);
|
||||
|
@ -1159,23 +1159,23 @@ void* ombConsumeThreadFunc(void* param) {
|
|||
|
||||
|
||||
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) {
|
||||
TAOS_RES *res = taos_query(taos, command);
|
||||
int32_t code = taos_errno(res);
|
||||
|
||||
if (code != 0) {
|
||||
TAOS_RES *res = taos_query(taos, command);
|
||||
int32_t code = taos_errno(res);
|
||||
|
||||
if (code != 0) {
|
||||
pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
|
||||
taos_free_result(res);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (INSERT_TYPE == type) {
|
||||
int affectedRows = taos_affected_rows(res);
|
||||
taos_free_result(res);
|
||||
return affectedRows;
|
||||
}
|
||||
|
||||
taos_free_result(res);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (INSERT_TYPE == type) {
|
||||
int affectedRows = taos_affected_rows(res);
|
||||
taos_free_result(res);
|
||||
return affectedRows;
|
||||
}
|
||||
|
||||
taos_free_result(res);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void* ombProduceThreadFunc(void* param) {
|
||||
|
|
Loading…
Reference in New Issue