diff --git a/include/client/taos.h b/include/client/taos.h index 73ab52357a..55b24c8721 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -210,6 +210,7 @@ DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); +DLL_EXPORT int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index edaa86befe..4a78ce957d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -860,6 +860,44 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { return pResInfo->pCol[columnIndex].offset; } +int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){ + if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || + columnIndex < 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + return TSDB_CODE_INVALID_PARA; + } + + int32_t numOfFields = taos_num_fields(res); + if (columnIndex >= numOfFields || numOfFields == 0) { + return TSDB_CODE_INVALID_PARA; + } + + SReqResultInfo *pResInfo = tscGetCurResInfo(res); + TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; + SResultColumn *pCol = &pResInfo->pCol[columnIndex]; + + if (*rows > pResInfo->numOfRows){ + *rows = pResInfo->numOfRows; + } + if (IS_VAR_DATA_TYPE(pField->type)) { + for(int i = 0; i < *rows; i++){ + if(pCol->offset[i] == -1){ + result[i] = true; + }else{ + result[i] = false; + } + } + }else{ + for(int i = 0; i < *rows; i++){ + if (colDataIsNull_f(pCol->nullbitmap, i)){ + result[i] = true; + }else{ + result[i] = false; + } + } + } + return 0; +} + int taos_validate_sql(TAOS *taos, const char *sql) { TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP); diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index db5eb21ad8..e5902856e6 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) add_executable(get_db_name_test get_db_name_test.c) add_executable(tmq_offset tmqOffset.c) +add_executable(tmq_multi_thread_test tmq_multi_thread_test.c) add_executable(tmq_offset_test tmq_offset_test.c) add_executable(varbinary_test varbinary_test.c) add_executable(replay_test replay_test.c) @@ -22,6 +23,15 @@ target_link_libraries( PUBLIC common PUBLIC os ) + +target_link_libraries( + tmq_multi_thread_test + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) + target_link_libraries( create_table PUBLIC taos diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c new file mode 100644 index 0000000000..0b6117c89b --- /dev/null +++ b/utils/test/c/tmq_multi_thread_test.c @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "taos.h" +#include "types.h" + +void* consumeThreadFunc(void* param) { + int32_t* index = (int32_t*) param; + tmq_conf_t* conf = tmq_conf_new(); + char groupId[64] = {0}; + int64_t t = taosGetTimestampMs(); + sprintf(groupId, "group_%"PRId64"_%d", t, *index); + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", groupId); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "select_d4"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + int32_t timeout = 200; + int32_t totalRows = 0; + while (1) { + printf("start to poll\n"); + + TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + int32_t rows = 0; + void* data = NULL; + taos_fetch_raw_block(pRes, &rows, &data); + + totalRows+=rows; + int cols = taos_num_fields(pRes); + for(int32_t i = 0; i < cols; ++i) { + int64_t start = taosGetTimestampUs(); + for (int32_t j = 0; j < rows; ++j) { + //int64_t t1 = taosGetTimestampUs(); + taos_is_null(pRes, j, i); + //int64_t t2 = taosGetTimestampUs(); + //printf("taos_is_null gourp:%s cost %"PRId64" us\n", groupId, t2 - t1); + } + int64_t end = taosGetTimestampUs(); + bool* isNULL = taosMemoryCalloc(rows, sizeof(bool)); + int code = taos_is_null_by_column(pRes, i, isNULL, &rows); + printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us, code:%d\n", groupId, totalRows, end - start, code); + } + + taos_free_result(pRes); + } else { + printf("no data\n"); + break; + } + } + tmq_consumer_close(tmq); + return NULL; +} + +int main(int argc, char* argv[]) { + if (argc != 2) { + printf("Usage: %s \n", argv[0]); + return 0; + } + + int32_t numOfThread = atoi(argv[1]); + TdThread *thread = taosMemoryCalloc(numOfThread, sizeof(TdThread)); + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + int64_t t1 = taosGetTimestampUs(); + // pthread_create one thread to consume + int32_t* paras = taosMemoryCalloc(numOfThread, sizeof(int32_t)); + for (int32_t i = 0; i < numOfThread; ++i) { + paras[i] = i; + taosThreadCreate(&(thread[i]), &thattr, consumeThreadFunc, (void*)(¶s[i])); + } + + for (int32_t i = 0; i < numOfThread; i++) { + taosThreadJoin(thread[i], NULL); + taosThreadClear(&thread[i]); + } + + int64_t t2 = taosGetTimestampUs(); + printf("total cost %"PRId64" us\n", t2 - t1); + taosMemoryFree(paras); + return 0; +}