diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index db5eb21ad8..e5902856e6 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) add_executable(get_db_name_test get_db_name_test.c) add_executable(tmq_offset tmqOffset.c) +add_executable(tmq_multi_thread_test tmq_multi_thread_test.c) add_executable(tmq_offset_test tmq_offset_test.c) add_executable(varbinary_test varbinary_test.c) add_executable(replay_test replay_test.c) @@ -22,6 +23,15 @@ target_link_libraries( PUBLIC common PUBLIC os ) + +target_link_libraries( + tmq_multi_thread_test + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) + target_link_libraries( create_table PUBLIC taos diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c new file mode 100644 index 0000000000..9be329b995 --- /dev/null +++ b/utils/test/c/tmq_multi_thread_test.c @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "taos.h" +#include "types.h" + +void* consumeThreadFunc(void* param) { + int32_t* index = (int32_t*) param; + tmq_conf_t* conf = tmq_conf_new(); + char groupId[64] = {0}; + sprintf(groupId, "group_%d", *index); + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", groupId); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "select_d4"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + int32_t timeout = 200; + + while (1) { + printf("start to poll\n"); + + TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + int cols = taos_num_fields(pRes); + for(int32_t i = 0; i < cols; ++i) { + int32_t rows = 0; + void* data = NULL; + int64_t start = taosGetTimestampUs(); + taos_fetch_raw_block(pRes, &rows, &data); + for (int32_t j = 0; j < rows; ++j) { + int64_t t1 = taosGetTimestampUs(); + taos_is_null(pRes, j, i); + int64_t t2 = taosGetTimestampUs(); + printf("taos_is_null cost %"PRId64" us\n", t2 - t1); + } + int64_t end = taosGetTimestampUs(); + printf("taos_fetch_raw_block rows:%d cost %"PRId64" us\n", rows, end - start); + } + + taos_free_result(pRes); + } else { + printf("no data\n"); + break; + } + } + tmq_consumer_close(tmq); + return NULL; +} + +int main(int argc, char* argv[]) { + if (argc != 2) { + printf("Usage: %s \n", argv[0]); + return 0; + } + + int32_t numOfThread = atoi(argv[1]); + TdThread *thread = taosMemoryCalloc(numOfThread, sizeof(TdThread)); + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + // pthread_create one thread to consume + int32_t* paras = taosMemoryCalloc(numOfThread, sizeof(int32_t)); + for (int32_t i = 0; i < numOfThread; ++i) { + paras[i] = i; + taosThreadCreate(&(thread[i]), &thattr, consumeThreadFunc, (void*)(¶s[i])); + } + + for (int32_t i = 0; i < numOfThread; i++) { + taosThreadJoin(thread[i], NULL); + taosThreadClear(&thread[i]); + } + + taosMemoryFree(paras); + return 0; +}