From 74df5d4711c9300e0a8abeeea5a7105f529651ad Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 11 Oct 2024 14:11:48 +0800 Subject: [PATCH] fix:[TD-32187] test tmq api random --- source/client/test/tmqTest.cpp | 7 +- tests/system-test/7-tmq/td-32187.py | 69 +++--------- utils/test/c/CMakeLists.txt | 8 ++ utils/test/c/tmq_td32187.c | 168 ++++++++++++++++++++++++++++ 4 files changed, 196 insertions(+), 56 deletions(-) create mode 100644 utils/test/c/tmq_td32187.c diff --git a/source/client/test/tmqTest.cpp b/source/client/test/tmqTest.cpp index 7505e81884..c7a75e352a 100644 --- a/source/client/test/tmqTest.cpp +++ b/source/client/test/tmqTest.cpp @@ -33,10 +33,9 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } -TEST(testCase, tmq_api_Test) { - tmq_conf_t *conf = tmq_conf_new(); - char msg[128] = {0}; - tmq_consumer_new(NULL, msg, -1); +TEST(testCase, driverInit_Test) { + // taosInitGlobalCfg(); + // taos_init(); } TEST(testCase, create_topic_ctb_Test) { diff --git a/tests/system-test/7-tmq/td-32187.py b/tests/system-test/7-tmq/td-32187.py index 73c7a88f65..7f971b23da 100644 --- a/tests/system-test/7-tmq/td-32187.py +++ b/tests/system-test/7-tmq/td-32187.py @@ -1,4 +1,3 @@ - import taos import sys import time @@ -16,65 +15,31 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: - clientCfgDict = {'debugFlag': 135} - updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict} - # updatecfgDict = {'debugFlag': 135, 'clientCfg':clientCfgDict, 'tmqRowSize':1} - + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file - def consume_test(self): - - tdSql.execute(f'create database if not exists d1') - tdSql.execute(f'use d1') - tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)') - tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)') - tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)') - tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)') - - - tdSql.execute(f'create topic topic_all as select * from st') - consumer_dict = { - "group.id": "g1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", - } - consumer = Consumer(consumer_dict) - - try: - consumer.unsubscribe() - consumer.unsubscribe() - consumer.subscribe(["topic_all"]) - consumer.subscribe(["topic_all"]) - except TmqError: - tdLog.exit(f"subscribe error") - - cnt = 0 - try: - while True: - res = consumer.poll(2) - if not res: - break - val = res.value() - if val is None: - print(f"null val") - continue - for block in val: - cnt += len(block.fetchall()) - - print(f"block {cnt} rows") - - finally: - consumer.unsubscribe(); - consumer.close() def run(self): - self.consume_test() + tdSql.execute(f'create database if not exists db_32187') + tdSql.execute(f'use db_32187') + tdSql.execute(f'create stable if not exists s5466 (ts timestamp, c1 int, c2 int) tags (t binary(32))') + tdSql.execute(f'insert into t1 using s5466 tags("__devicid__") values(1669092069068, 0, 1)') + tdSql.execute(f'insert into t1(ts, c1, c2) values(1669092069067, 0, 1)') + + tdSql.execute("create topic topic_test with meta as database db_32187") + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_td32187'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + return def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 991a004a74..1bf09c0ee2 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -4,6 +4,7 @@ add_executable(tmq_sim tmqSim.c) add_executable(create_table createTable.c) add_executable(tmq_taosx_ci tmq_taosx_ci.c) add_executable(tmq_ts5466 tmq_ts5466.c) +add_executable(tmq_td32187 tmq_td32187.c) add_executable(tmq_write_raw_test tmq_write_raw_test.c) add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) @@ -62,6 +63,13 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_td32187 + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( tmq_taosx_ci PUBLIC taos diff --git a/utils/test/c/tmq_td32187.c b/utils/test/c/tmq_td32187.c new file mode 100644 index 0000000000..05620b482d --- /dev/null +++ b/utils/test/c/tmq_td32187.c @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "cJSON.h" +#include "taos.h" +#include "tmsg.h" +#include "types.h" + + +static TAOS_RES* tmqmessage = NULL; +static char* topic = "topic_test"; +static int32_t vgroupId = 0; +static int64_t offset = 0; + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("commit %d tmq %p param %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.consume.excluded", "1"); +// tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, topic); + return topic_list; +} + +static void callFunc(int i, tmq_t* tmq, tmq_list_t* topics) { + printf("call %d\n", i); + switch (i) { + case 0: + tmq_subscribe(tmq, topics); + break; + case 1: + tmq_unsubscribe(tmq); + break; + case 2:{ + tmq_list_t* t = NULL; + tmq_subscription(tmq, &t); + tmq_list_destroy(t); + break; + } + case 3: + tmqmessage = tmq_consumer_poll(tmq, 5000); + break; + case 4: + tmq_consumer_close(tmq); + break; + case 5: + tmq_commit_sync(tmq, NULL); + break; + case 6: + tmq_commit_async(tmq, NULL, NULL, NULL); + break; + case 7: + tmq_commit_offset_sync(tmq, topic, vgroupId, offset); + break; + case 8: + tmq_commit_offset_async(tmq, topic, vgroupId, offset, NULL, NULL); + break; + case 9: + tmq_get_topic_assignment(tmq, topic, NULL, NULL); + break; + case 10: + tmq_free_assignment(NULL); + break; + case 11: + tmq_offset_seek(tmq, topic, vgroupId, offset); + break; + case 12: + tmq_position(tmq, topic, vgroupId); + break; + case 13: + tmq_committed(tmq, topic, vgroupId); + break; + case 14: + tmq_get_connect(tmq); + break; + case 15: + tmq_get_table_name(tmqmessage); + break; + case 16: + vgroupId = tmq_get_vgroup_id(tmqmessage); + break; + case 17: + offset = tmq_get_vgroup_offset(tmqmessage); + break; + case 18: + tmq_get_res_type(tmqmessage); + break; + case 19: + tmq_get_topic_name(tmqmessage); + break; + case 20: + tmq_get_db_name(tmqmessage); + break; + default: + break; + } +} +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + printf("subscribe err\n"); + return; + } + int32_t cnt = 0; + while (1) { + tmqmessage = tmq_consumer_poll(tmq, 5000); + if (tmqmessage) { + printf("poll message\n"); + while(cnt < 1000){ + callFunc(taosRand()%21, tmq, topics); + cnt++; + } + } else { + break; + } + } + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +int main(int argc, char* argv[]) { + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + basic_consume_loop(tmq, topic_list); + tmq_list_destroy(topic_list); +} \ No newline at end of file