From 6ba13305bdd772154cc59aa09431fd42e73f8faf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 09:59:10 +0800 Subject: [PATCH 1/7] feat:[TD-31242] add test case --- utils/test/c/CMakeLists.txt | 10 +++ utils/test/c/tmq_multi_thread_test.c | 107 +++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 utils/test/c/tmq_multi_thread_test.c diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index db5eb21ad8..e5902856e6 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) add_executable(get_db_name_test get_db_name_test.c) add_executable(tmq_offset tmqOffset.c) +add_executable(tmq_multi_thread_test tmq_multi_thread_test.c) add_executable(tmq_offset_test tmq_offset_test.c) add_executable(varbinary_test varbinary_test.c) add_executable(replay_test replay_test.c) @@ -22,6 +23,15 @@ target_link_libraries( PUBLIC common PUBLIC os ) + +target_link_libraries( + tmq_multi_thread_test + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) + target_link_libraries( create_table PUBLIC taos diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c new file mode 100644 index 0000000000..9be329b995 --- /dev/null +++ b/utils/test/c/tmq_multi_thread_test.c @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "taos.h" +#include "types.h" + +void* consumeThreadFunc(void* param) { + int32_t* index = (int32_t*) param; + tmq_conf_t* conf = tmq_conf_new(); + char groupId[64] = {0}; + sprintf(groupId, "group_%d", *index); + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", groupId); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "select_d4"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + int32_t timeout = 200; + + while (1) { + printf("start to poll\n"); + + TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + int cols = taos_num_fields(pRes); + for(int32_t i = 0; i < cols; ++i) { + int32_t rows = 0; + void* data = NULL; + int64_t start = taosGetTimestampUs(); + taos_fetch_raw_block(pRes, &rows, &data); + for (int32_t j = 0; j < rows; ++j) { + int64_t t1 = taosGetTimestampUs(); + taos_is_null(pRes, j, i); + int64_t t2 = taosGetTimestampUs(); + printf("taos_is_null cost %"PRId64" us\n", t2 - t1); + } + int64_t end = taosGetTimestampUs(); + printf("taos_fetch_raw_block rows:%d cost %"PRId64" us\n", rows, end - start); + } + + taos_free_result(pRes); + } else { + printf("no data\n"); + break; + } + } + tmq_consumer_close(tmq); + return NULL; +} + +int main(int argc, char* argv[]) { + if (argc != 2) { + printf("Usage: %s \n", argv[0]); + return 0; + } + + int32_t numOfThread = atoi(argv[1]); + TdThread *thread = taosMemoryCalloc(numOfThread, sizeof(TdThread)); + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + // pthread_create one thread to consume + int32_t* paras = taosMemoryCalloc(numOfThread, sizeof(int32_t)); + for (int32_t i = 0; i < numOfThread; ++i) { + paras[i] = i; + taosThreadCreate(&(thread[i]), &thattr, consumeThreadFunc, (void*)(¶s[i])); + } + + for (int32_t i = 0; i < numOfThread; i++) { + taosThreadJoin(thread[i], NULL); + taosThreadClear(&thread[i]); + } + + taosMemoryFree(paras); + return 0; +} From a94baddfcdadae8054229e268eb2d93d3cb27981 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 10:49:37 +0800 Subject: [PATCH 2/7] feat:[TD-31242] add test case --- utils/test/c/tmq_multi_thread_test.c | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c index 9be329b995..d4470fffa2 100644 --- a/utils/test/c/tmq_multi_thread_test.c +++ b/utils/test/c/tmq_multi_thread_test.c @@ -25,7 +25,8 @@ void* consumeThreadFunc(void* param) { int32_t* index = (int32_t*) param; tmq_conf_t* conf = tmq_conf_new(); char groupId[64] = {0}; - sprintf(groupId, "group_%d", *index); + int64_t t = taosGetTimestampMs(); + sprintf(groupId, "group_%ld_%d", t, *index); tmq_conf_set(conf, "enable.auto.commit", "false"); tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); tmq_conf_set(conf, "group.id", groupId); @@ -46,26 +47,28 @@ void* consumeThreadFunc(void* param) { tmq_list_destroy(topicList); int32_t timeout = 200; - + int32_t totalRows = 0; while (1) { printf("start to poll\n"); TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout); if (pRes) { + int32_t rows = 0; + void* data = NULL; + taos_fetch_raw_block(pRes, &rows, &data); + + totalRows+=rows; int cols = taos_num_fields(pRes); for(int32_t i = 0; i < cols; ++i) { - int32_t rows = 0; - void* data = NULL; int64_t start = taosGetTimestampUs(); - taos_fetch_raw_block(pRes, &rows, &data); for (int32_t j = 0; j < rows; ++j) { - int64_t t1 = taosGetTimestampUs(); + //int64_t t1 = taosGetTimestampUs(); taos_is_null(pRes, j, i); - int64_t t2 = taosGetTimestampUs(); - printf("taos_is_null cost %"PRId64" us\n", t2 - t1); + //int64_t t2 = taosGetTimestampUs(); + //printf("taos_is_null gourp:%s cost %"PRId64" us\n", groupId, t2 - t1); } int64_t end = taosGetTimestampUs(); - printf("taos_fetch_raw_block rows:%d cost %"PRId64" us\n", rows, end - start); + printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us\n", groupId, totalRows, end - start); } taos_free_result(pRes); @@ -90,6 +93,7 @@ int main(int argc, char* argv[]) { taosThreadAttrInit(&thattr); taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + int64_t t1 = taosGetTimestampUs(); // pthread_create one thread to consume int32_t* paras = taosMemoryCalloc(numOfThread, sizeof(int32_t)); for (int32_t i = 0; i < numOfThread; ++i) { @@ -102,6 +106,8 @@ int main(int argc, char* argv[]) { taosThreadClear(&thread[i]); } + int64_t t2 = taosGetTimestampUs(); + printf("total cost %"PRId64" us\n", t2 - t1); taosMemoryFree(paras); return 0; } From aad439c09a193bcebe1470a45890bb41aa895253 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 15:40:38 +0800 Subject: [PATCH 3/7] feat:[TD-31242]add new interface taos_get_column_data_null to get if raw data is null --- include/client/taos.h | 1 + source/client/src/clientMain.c | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/include/client/taos.h b/include/client/taos.h index 73ab52357a..a3d35104e1 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -215,6 +215,7 @@ DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData); DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); +DLL_EXPORT int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int rows); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 12702a93f3..ab4d1837ea 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -859,6 +859,40 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { return pResInfo->pCol[columnIndex].offset; } +int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int rows){ + if (res == NULL || result == NULL || rows <= 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + return TSDB_CODE_INVALID_PARA; + } + + int32_t numOfFields = taos_num_fields(res); + if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) { + return TSDB_CODE_INVALID_PARA; + } + + SReqResultInfo *pResInfo = tscGetCurResInfo(res); + TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; + SResultColumn *pCol = &pResInfo->pCol[columnIndex]; + + if (IS_VAR_DATA_TYPE(pField->type)) { + for(int i = 0; i < rows && i < pResInfo->numOfRows; i++){ + if(pCol->offset[i] == -1){ + result[i] = true; + }else{ + result[i] = false; + } + } + }else{ + for(int i = 0; i < rows && i < pResInfo->numOfRows; i++){ + if (colDataIsNull_f(pCol->nullbitmap, i)){ + result[i] = true; + }else{ + result[i] = false; + } + } + } + return 0; +} + int taos_validate_sql(TAOS *taos, const char *sql) { TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP); From e6787548fb04a107ac9afa2b1a79a8750111f0bf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 15:46:37 +0800 Subject: [PATCH 4/7] feat:[TD-31242]add new interface taos_get_column_data_null to get if raw data is null --- utils/test/c/tmq_multi_thread_test.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c index d4470fffa2..990bf409ca 100644 --- a/utils/test/c/tmq_multi_thread_test.c +++ b/utils/test/c/tmq_multi_thread_test.c @@ -68,7 +68,9 @@ void* consumeThreadFunc(void* param) { //printf("taos_is_null gourp:%s cost %"PRId64" us\n", groupId, t2 - t1); } int64_t end = taosGetTimestampUs(); - printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us\n", groupId, totalRows, end - start); + bool* isNULL = taosMemoryCalloc(rows, sizeof(bool)); + int code = taos_get_column_data_null(pRes, i, isNULL, rows); + printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us, code:%d\n", groupId, totalRows, end - start, code); } taos_free_result(pRes); From df33b71bef770a27122b704492bc0aa99aff5ab7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 19 Aug 2024 20:12:52 +0800 Subject: [PATCH 5/7] feat:[TD-31242]add new interface taos_get_column_data_null to get if raw data is null --- utils/test/c/tmq_multi_thread_test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c index 990bf409ca..c34cd43716 100644 --- a/utils/test/c/tmq_multi_thread_test.c +++ b/utils/test/c/tmq_multi_thread_test.c @@ -26,7 +26,7 @@ void* consumeThreadFunc(void* param) { tmq_conf_t* conf = tmq_conf_new(); char groupId[64] = {0}; int64_t t = taosGetTimestampMs(); - sprintf(groupId, "group_%ld_%d", t, *index); + sprintf(groupId, "group_%"PRId64"_%d", t, *index); tmq_conf_set(conf, "enable.auto.commit", "false"); tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); tmq_conf_set(conf, "group.id", groupId); From b86f416645733dc8f8c9c8924ddb0395d019a64d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 28 Aug 2024 18:00:40 +0800 Subject: [PATCH 6/7] feat:[TD-31242]add new interface taos_get_column_data_null to get if raw data is null --- include/client/taos.h | 2 +- source/client/src/clientMain.c | 11 +++++++---- utils/test/c/tmq_multi_thread_test.c | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index a3d35104e1..55b24c8721 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -210,12 +210,12 @@ DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); +DLL_EXPORT int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData); DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); -DLL_EXPORT int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int rows); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ab4d1837ea..9b336d27f2 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -859,8 +859,8 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { return pResInfo->pCol[columnIndex].offset; } -int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int rows){ - if (res == NULL || result == NULL || rows <= 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { +int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){ + if (res == NULL || result == NULL || *rows <= 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return TSDB_CODE_INVALID_PARA; } @@ -873,8 +873,11 @@ int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; SResultColumn *pCol = &pResInfo->pCol[columnIndex]; + if (*rows > pResInfo->numOfRows){ + *rows = pResInfo->numOfRows; + } if (IS_VAR_DATA_TYPE(pField->type)) { - for(int i = 0; i < rows && i < pResInfo->numOfRows; i++){ + for(int i = 0; i < *rows; i++){ if(pCol->offset[i] == -1){ result[i] = true; }else{ @@ -882,7 +885,7 @@ int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int } } }else{ - for(int i = 0; i < rows && i < pResInfo->numOfRows; i++){ + for(int i = 0; i < *rows; i++){ if (colDataIsNull_f(pCol->nullbitmap, i)){ result[i] = true; }else{ diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c index c34cd43716..0b6117c89b 100644 --- a/utils/test/c/tmq_multi_thread_test.c +++ b/utils/test/c/tmq_multi_thread_test.c @@ -69,7 +69,7 @@ void* consumeThreadFunc(void* param) { } int64_t end = taosGetTimestampUs(); bool* isNULL = taosMemoryCalloc(rows, sizeof(bool)); - int code = taos_get_column_data_null(pRes, i, isNULL, rows); + int code = taos_is_null_by_column(pRes, i, isNULL, &rows); printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us, code:%d\n", groupId, totalRows, end - start, code); } From 2084b677b129fa0809f66d01be9f1d113bad5d71 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 29 Aug 2024 17:24:34 +0800 Subject: [PATCH 7/7] fix:[TD-31242] add parameters check --- source/client/src/clientMain.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9b336d27f2..57faa4b739 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -860,12 +860,13 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { } int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){ - if (res == NULL || result == NULL || *rows <= 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || + columnIndex < 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return TSDB_CODE_INVALID_PARA; } int32_t numOfFields = taos_num_fields(res); - if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) { + if (columnIndex >= numOfFields || numOfFields == 0) { return TSDB_CODE_INVALID_PARA; }