From 7c0b0cf9bab680ab9a9a09dd30d9d1ffe3b8847e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 27 Sep 2024 01:43:36 +0800 Subject: [PATCH 1/2] fix:[TS-5466] tag rewrite error --- include/common/tdataformat.h | 1 - source/client/src/clientRawBlockWrite.c | 34 ++++++- source/common/src/tdataformat.c | 20 ---- tests/parallel_test/cases.task | 1 + tests/system-test/7-tmq/tmq_ts5466.py | 51 ++++++++++ utils/test/c/CMakeLists.txt | 8 ++ utils/test/c/tmq_ts5466.c | 124 ++++++++++++++++++++++++ 7 files changed, 216 insertions(+), 23 deletions(-) create mode 100644 tests/system-test/7-tmq/tmq_ts5466.py create mode 100644 utils/test/c/tmq_ts5466.c diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 1179e710cd..62901151e9 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -153,7 +153,6 @@ char *tTagValToData(const STagVal *pTagVal, bool isJson); int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); int32_t tTagToValArray(const STag *pTag, SArray **ppArray); -void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid); void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove int32_t parseJsontoTagData(const char *json, SArray *pTagVals, STag **ppTag, void *pMsgBuf); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 8a888a2a47..eeb77676d6 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1120,7 +1120,8 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { SRequestObj* pRequest = NULL; SQuery* pQuery = NULL; SHashObj* pVgroupHashmap = NULL; - + SArray* pTagList = taosArrayInit(0, POINTER_BYTES); + RAW_NULL_CHECK(pTagList); RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0)); uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen); @@ -1186,6 +1187,14 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { } pCreateReq->ctb.suid = pTableMeta->uid; + SArray* pTagVals = NULL; + code = tTagToValArray((STag *)pCreateReq->ctb.pTag, &pTagVals); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pTableMeta); + goto end; + } + + bool rebuildTag = false; for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) { char* tName = taosArrayGet(pCreateReq->ctb.tagName, i); if (tName == NULL) { @@ -1195,11 +1204,31 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { j < pTableMeta->tableInfo.numOfColumns + pTableMeta->tableInfo.numOfTags; j++) { SSchema* tag = &pTableMeta->schema[j]; if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) { - tTagSetCid((STag*)pCreateReq->ctb.pTag, i, tag->colId); + STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); + if (pTagVal) { + if (pTagVal->cid != tag->colId){ + pTagVal->cid = tag->colId; + rebuildTag = true; + } + } else { + uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name, (int)taosArrayGetSize(pTagVals), i, tag->colId); + } } } } taosMemoryFreeClear(pTableMeta); + if (rebuildTag){ + STag* ppTag = NULL; + code = tTagNew(pTagVals, 1, false, &ppTag); + taosArrayDestroy(pTagVals); + pTagVals = NULL; + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + pCreateReq->ctb.pTag = (uint8_t*)ppTag; + taosArrayPush(pTagList, &ppTag); + } + taosArrayDestroy(pTagVals); } RAW_NULL_CHECK(taosArrayPush(pRequest->tableList, &pName)); @@ -1251,6 +1280,7 @@ end: destroyRequest(pRequest); tDecoderClear(&coder); qDestroyQuery(pQuery); + taosArrayDestroyP(pTagList, taosMemoryFree); return code; } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 4b44e4af43..2038851c09 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1767,26 +1767,6 @@ _err: return code; } -void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid) { - uint8_t *p = NULL; - int8_t isLarge = pTag->flags & TD_TAG_LARGE; - int16_t offset = 0; - - if (isLarge) { - p = (uint8_t *)&((int16_t *)pTag->idx)[pTag->nTag]; - } else { - p = (uint8_t *)&pTag->idx[pTag->nTag]; - } - - if (isLarge) { - offset = ((int16_t *)pTag->idx)[iTag]; - } else { - offset = pTag->idx[iTag]; - } - - (void)tPutI16v(p + offset, cid); -} - // STSchema ======================================== STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) { STSchema *pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 1a00787a6b..8898a3b0c3 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -280,6 +280,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts5466.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py diff --git a/tests/system-test/7-tmq/tmq_ts5466.py b/tests/system-test/7-tmq/tmq_ts5466.py new file mode 100644 index 0000000000..1afe74c3b4 --- /dev/null +++ b/tests/system-test/7-tmq/tmq_ts5466.py @@ -0,0 +1,51 @@ +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def run(self): + tdSql.execute(f'create database if not exists db_taosx') + tdSql.execute(f'create database if not exists db_5466') + tdSql.execute(f'use db_5466') + tdSql.execute(f'create stable if not exists s5466 (ts timestamp, c1 int, c2 int) tags (t binary(32))') + tdSql.execute(f'insert into t1 using s5466 tags("__devicid__") values(1669092069068, 0, 1)') + for i in range(80): + if i < 3: + continue + tdSql.execute(f'alter stable s5466 add column c{i} int') + tdSql.execute(f'insert into t1(ts, c1, c2) values(1669092069067, 0, 1)') + tdSql.execute(f'flush database db_5466') + + tdSql.execute("create topic db_5466_topic with meta as database db_5466") + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_ts5466'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + return + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index e5902856e6..2e4feb2539 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -3,6 +3,7 @@ add_dependencies(tmq_demo taos) add_executable(tmq_sim tmqSim.c) add_executable(create_table createTable.c) add_executable(tmq_taosx_ci tmq_taosx_ci.c) +add_executable(tmq_ts5466 tmq_ts5466.c) add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) add_executable(get_db_name_test get_db_name_test.c) @@ -53,6 +54,13 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_ts5466 + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( tmq_taosx_ci PUBLIC taos diff --git a/utils/test/c/tmq_ts5466.c b/utils/test/c/tmq_ts5466.c new file mode 100644 index 0000000000..86a247a0ed --- /dev/null +++ b/utils/test/c/tmq_ts5466.c @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "cJSON.h" +#include "taos.h" +#include "tmsg.h" +#include "types.h" + +static TAOS* use_db() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return NULL; + } + + TAOS_RES* pRes = taos_query(pConn, "use db_taosx"); + if (taos_errno(pRes) != 0) { + printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes)); + return NULL; + } + taos_free_result(pRes); + return pConn; +} + +static void msg_process(TAOS_RES* msg) { + printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg)); + printf("db: %s\n", tmq_get_db_name(msg)); + printf("vg: %d\n", tmq_get_vgroup_id(msg)); + TAOS* pConn = use_db(); + if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META || tmq_get_res_type(msg) == TMQ_RES_METADATA) { + char* result = tmq_get_json_meta(msg); + printf("meta result: %s\n", result); + tmq_free_json_meta(result); + } + + tmq_raw_data raw = {0}; + tmq_get_raw(msg, &raw); + printf("write raw data type: %d\n", raw.raw_type); + int32_t ret = tmq_write_raw(pConn, raw); + printf("write raw data: %s\n", tmq_err2str(ret)); + ASSERT(ret == 0); + + tmq_free_raw(raw); + taos_close(pConn); +} + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("commit %d tmq %p param %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.consume.excluded", "1"); + tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "db_5466_topic"); + return topic_list; +} + +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + printf("subscribe err\n"); + return; + } + int32_t cnt = 0; + while (1) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000); + if (tmqmessage) { + cnt++; + msg_process(tmqmessage); + taos_free_result(tmqmessage); + } else { + break; + } + } + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +int main(int argc, char* argv[]) { + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + basic_consume_loop(tmq, topic_list); + tmq_list_destroy(topic_list); +} \ No newline at end of file From cdeb534125ee822f3d26176c4dbe342bd9eacba7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 27 Sep 2024 09:04:55 +0800 Subject: [PATCH 2/2] fix: function return code issue --- source/client/src/clientRawBlockWrite.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 4ac9188e4a..b90ee3073e 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1225,8 +1225,11 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { if (code != TSDB_CODE_SUCCESS) { goto end; } + if (NULL == taosArrayPush(pTagList, &ppTag)) { + tTagFree(ppTag); + goto end; + } pCreateReq->ctb.pTag = (uint8_t*)ppTag; - taosArrayPush(pTagList, &ppTag); } taosArrayDestroy(pTagVals); }