From 6ba13305bdd772154cc59aa09431fd42e73f8faf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 09:59:10 +0800 Subject: [PATCH 01/16] feat:[TD-31242] add test case --- utils/test/c/CMakeLists.txt | 10 +++ utils/test/c/tmq_multi_thread_test.c | 107 +++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 utils/test/c/tmq_multi_thread_test.c diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index db5eb21ad8..e5902856e6 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) add_executable(get_db_name_test get_db_name_test.c) add_executable(tmq_offset tmqOffset.c) +add_executable(tmq_multi_thread_test tmq_multi_thread_test.c) add_executable(tmq_offset_test tmq_offset_test.c) add_executable(varbinary_test varbinary_test.c) add_executable(replay_test replay_test.c) @@ -22,6 +23,15 @@ target_link_libraries( PUBLIC common PUBLIC os ) + +target_link_libraries( + tmq_multi_thread_test + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) + target_link_libraries( create_table PUBLIC taos diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c new file mode 100644 index 0000000000..9be329b995 --- /dev/null +++ b/utils/test/c/tmq_multi_thread_test.c @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "taos.h" +#include "types.h" + +void* consumeThreadFunc(void* param) { + int32_t* index = (int32_t*) param; + tmq_conf_t* conf = tmq_conf_new(); + char groupId[64] = {0}; + sprintf(groupId, "group_%d", *index); + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", groupId); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "select_d4"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + int32_t timeout = 200; + + while (1) { + printf("start to poll\n"); + + TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + int cols = taos_num_fields(pRes); + for(int32_t i = 0; i < cols; ++i) { + int32_t rows = 0; + void* data = NULL; + int64_t start = taosGetTimestampUs(); + taos_fetch_raw_block(pRes, &rows, &data); + for (int32_t j = 0; j < rows; ++j) { + int64_t t1 = taosGetTimestampUs(); + taos_is_null(pRes, j, i); + int64_t t2 = taosGetTimestampUs(); + printf("taos_is_null cost %"PRId64" us\n", t2 - t1); + } + int64_t end = taosGetTimestampUs(); + printf("taos_fetch_raw_block rows:%d cost %"PRId64" us\n", rows, end - start); + } + + taos_free_result(pRes); + } else { + printf("no data\n"); + break; + } + } + tmq_consumer_close(tmq); + return NULL; +} + +int main(int argc, char* argv[]) { + if (argc != 2) { + printf("Usage: %s \n", argv[0]); + return 0; + } + + int32_t numOfThread = atoi(argv[1]); + TdThread *thread = taosMemoryCalloc(numOfThread, sizeof(TdThread)); + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + // pthread_create one thread to consume + int32_t* paras = taosMemoryCalloc(numOfThread, sizeof(int32_t)); + for (int32_t i = 0; i < numOfThread; ++i) { + paras[i] = i; + taosThreadCreate(&(thread[i]), &thattr, consumeThreadFunc, (void*)(¶s[i])); + } + + for (int32_t i = 0; i < numOfThread; i++) { + taosThreadJoin(thread[i], NULL); + taosThreadClear(&thread[i]); + } + + taosMemoryFree(paras); + return 0; +} From a94baddfcdadae8054229e268eb2d93d3cb27981 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 10:49:37 +0800 Subject: [PATCH 02/16] feat:[TD-31242] add test case --- utils/test/c/tmq_multi_thread_test.c | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c index 9be329b995..d4470fffa2 100644 --- a/utils/test/c/tmq_multi_thread_test.c +++ b/utils/test/c/tmq_multi_thread_test.c @@ -25,7 +25,8 @@ void* consumeThreadFunc(void* param) { int32_t* index = (int32_t*) param; tmq_conf_t* conf = tmq_conf_new(); char groupId[64] = {0}; - sprintf(groupId, "group_%d", *index); + int64_t t = taosGetTimestampMs(); + sprintf(groupId, "group_%ld_%d", t, *index); tmq_conf_set(conf, "enable.auto.commit", "false"); tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); tmq_conf_set(conf, "group.id", groupId); @@ -46,26 +47,28 @@ void* consumeThreadFunc(void* param) { tmq_list_destroy(topicList); int32_t timeout = 200; - + int32_t totalRows = 0; while (1) { printf("start to poll\n"); TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout); if (pRes) { + int32_t rows = 0; + void* data = NULL; + taos_fetch_raw_block(pRes, &rows, &data); + + totalRows+=rows; int cols = taos_num_fields(pRes); for(int32_t i = 0; i < cols; ++i) { - int32_t rows = 0; - void* data = NULL; int64_t start = taosGetTimestampUs(); - taos_fetch_raw_block(pRes, &rows, &data); for (int32_t j = 0; j < rows; ++j) { - int64_t t1 = taosGetTimestampUs(); + //int64_t t1 = taosGetTimestampUs(); taos_is_null(pRes, j, i); - int64_t t2 = taosGetTimestampUs(); - printf("taos_is_null cost %"PRId64" us\n", t2 - t1); + //int64_t t2 = taosGetTimestampUs(); + //printf("taos_is_null gourp:%s cost %"PRId64" us\n", groupId, t2 - t1); } int64_t end = taosGetTimestampUs(); - printf("taos_fetch_raw_block rows:%d cost %"PRId64" us\n", rows, end - start); + printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us\n", groupId, totalRows, end - start); } taos_free_result(pRes); @@ -90,6 +93,7 @@ int main(int argc, char* argv[]) { taosThreadAttrInit(&thattr); taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + int64_t t1 = taosGetTimestampUs(); // pthread_create one thread to consume int32_t* paras = taosMemoryCalloc(numOfThread, sizeof(int32_t)); for (int32_t i = 0; i < numOfThread; ++i) { @@ -102,6 +106,8 @@ int main(int argc, char* argv[]) { taosThreadClear(&thread[i]); } + int64_t t2 = taosGetTimestampUs(); + printf("total cost %"PRId64" us\n", t2 - t1); taosMemoryFree(paras); return 0; } From aad439c09a193bcebe1470a45890bb41aa895253 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 15:40:38 +0800 Subject: [PATCH 03/16] feat:[TD-31242]add new interface taos_get_column_data_null to get if raw data is null --- include/client/taos.h | 1 + source/client/src/clientMain.c | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/include/client/taos.h b/include/client/taos.h index 73ab52357a..a3d35104e1 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -215,6 +215,7 @@ DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData); DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); +DLL_EXPORT int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int rows); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 12702a93f3..ab4d1837ea 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -859,6 +859,40 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { return pResInfo->pCol[columnIndex].offset; } +int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int rows){ + if (res == NULL || result == NULL || rows <= 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + return TSDB_CODE_INVALID_PARA; + } + + int32_t numOfFields = taos_num_fields(res); + if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) { + return TSDB_CODE_INVALID_PARA; + } + + SReqResultInfo *pResInfo = tscGetCurResInfo(res); + TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; + SResultColumn *pCol = &pResInfo->pCol[columnIndex]; + + if (IS_VAR_DATA_TYPE(pField->type)) { + for(int i = 0; i < rows && i < pResInfo->numOfRows; i++){ + if(pCol->offset[i] == -1){ + result[i] = true; + }else{ + result[i] = false; + } + } + }else{ + for(int i = 0; i < rows && i < pResInfo->numOfRows; i++){ + if (colDataIsNull_f(pCol->nullbitmap, i)){ + result[i] = true; + }else{ + result[i] = false; + } + } + } + return 0; +} + int taos_validate_sql(TAOS *taos, const char *sql) { TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP); From e6787548fb04a107ac9afa2b1a79a8750111f0bf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 15:46:37 +0800 Subject: [PATCH 04/16] feat:[TD-31242]add new interface taos_get_column_data_null to get if raw data is null --- utils/test/c/tmq_multi_thread_test.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c index d4470fffa2..990bf409ca 100644 --- a/utils/test/c/tmq_multi_thread_test.c +++ b/utils/test/c/tmq_multi_thread_test.c @@ -68,7 +68,9 @@ void* consumeThreadFunc(void* param) { //printf("taos_is_null gourp:%s cost %"PRId64" us\n", groupId, t2 - t1); } int64_t end = taosGetTimestampUs(); - printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us\n", groupId, totalRows, end - start); + bool* isNULL = taosMemoryCalloc(rows, sizeof(bool)); + int code = taos_get_column_data_null(pRes, i, isNULL, rows); + printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us, code:%d\n", groupId, totalRows, end - start, code); } taos_free_result(pRes); From df33b71bef770a27122b704492bc0aa99aff5ab7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 19 Aug 2024 20:12:52 +0800 Subject: [PATCH 05/16] feat:[TD-31242]add new interface taos_get_column_data_null to get if raw data is null --- utils/test/c/tmq_multi_thread_test.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c index 990bf409ca..c34cd43716 100644 --- a/utils/test/c/tmq_multi_thread_test.c +++ b/utils/test/c/tmq_multi_thread_test.c @@ -26,7 +26,7 @@ void* consumeThreadFunc(void* param) { tmq_conf_t* conf = tmq_conf_new(); char groupId[64] = {0}; int64_t t = taosGetTimestampMs(); - sprintf(groupId, "group_%ld_%d", t, *index); + sprintf(groupId, "group_%"PRId64"_%d", t, *index); tmq_conf_set(conf, "enable.auto.commit", "false"); tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); tmq_conf_set(conf, "group.id", groupId); From b86f416645733dc8f8c9c8924ddb0395d019a64d Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 28 Aug 2024 18:00:40 +0800 Subject: [PATCH 06/16] feat:[TD-31242]add new interface taos_get_column_data_null to get if raw data is null --- include/client/taos.h | 2 +- source/client/src/clientMain.c | 11 +++++++---- utils/test/c/tmq_multi_thread_test.c | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index a3d35104e1..55b24c8721 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -210,12 +210,12 @@ DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); +DLL_EXPORT int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData); DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); -DLL_EXPORT int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int rows); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ab4d1837ea..9b336d27f2 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -859,8 +859,8 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { return pResInfo->pCol[columnIndex].offset; } -int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int rows){ - if (res == NULL || result == NULL || rows <= 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { +int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){ + if (res == NULL || result == NULL || *rows <= 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return TSDB_CODE_INVALID_PARA; } @@ -873,8 +873,11 @@ int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; SResultColumn *pCol = &pResInfo->pCol[columnIndex]; + if (*rows > pResInfo->numOfRows){ + *rows = pResInfo->numOfRows; + } if (IS_VAR_DATA_TYPE(pField->type)) { - for(int i = 0; i < rows && i < pResInfo->numOfRows; i++){ + for(int i = 0; i < *rows; i++){ if(pCol->offset[i] == -1){ result[i] = true; }else{ @@ -882,7 +885,7 @@ int taos_get_column_data_null(TAOS_RES *res, int columnIndex, bool result[], int } } }else{ - for(int i = 0; i < rows && i < pResInfo->numOfRows; i++){ + for(int i = 0; i < *rows; i++){ if (colDataIsNull_f(pCol->nullbitmap, i)){ result[i] = true; }else{ diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c index c34cd43716..0b6117c89b 100644 --- a/utils/test/c/tmq_multi_thread_test.c +++ b/utils/test/c/tmq_multi_thread_test.c @@ -69,7 +69,7 @@ void* consumeThreadFunc(void* param) { } int64_t end = taosGetTimestampUs(); bool* isNULL = taosMemoryCalloc(rows, sizeof(bool)); - int code = taos_get_column_data_null(pRes, i, isNULL, rows); + int code = taos_is_null_by_column(pRes, i, isNULL, &rows); printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us, code:%d\n", groupId, totalRows, end - start, code); } From feb1c8518e28aa04336f45087300049bef08c420 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 29 Aug 2024 15:08:38 +0800 Subject: [PATCH 07/16] fix: remove asserts --- include/common/tdatablock.h | 10 ---- source/common/src/tdatablock.c | 70 ++++++++++++++++++------ source/libs/executor/inc/executil.h | 1 - source/libs/executor/inc/mergejoin.h | 2 - source/libs/executor/src/groupoperator.c | 5 ++ source/libs/executor/src/tlinearhash.c | 2 +- source/libs/executor/src/tsort.c | 7 +++ source/libs/nodes/src/nodesUtilFuncs.c | 2 - source/libs/planner/src/planOptimizer.c | 2 - 9 files changed, 66 insertions(+), 35 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 080416ca2e..b4dd6d61e4 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -113,10 +113,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u if (pColAgg != NULL && pColAgg->colId != -1) { if (pColAgg->numOfNull == totalRows) { - ASSERT(pColumnInfoData->nullbitmap == NULL); return true; } else if (pColAgg->numOfNull == 0) { - ASSERT(pColumnInfoData->nullbitmap == NULL); return false; } } @@ -159,40 +157,32 @@ static FORCE_INLINE void colDataSetNNULL(SColumnInfoData* pColumnInfoData, uint3 } static FORCE_INLINE void colDataSetInt8(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int8_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_TINYINT || - pColumnInfoData->info.type == TSDB_DATA_TYPE_UTINYINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_BOOL); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(int8_t*)p = *(int8_t*)v; } static FORCE_INLINE void colDataSetInt16(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int16_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || - pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(int16_t*)p = *(int16_t*)v; } static FORCE_INLINE void colDataSetInt32(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int32_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_INT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UINT); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(int32_t*)p = *(int32_t*)v; } static FORCE_INLINE void colDataSetInt64(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int64_t* v) { int32_t type = pColumnInfoData->info.type; - ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(int64_t*)p = *(int64_t*)v; } static FORCE_INLINE void colDataSetFloat(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, float* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_FLOAT); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(float*)p = *(float*)v; } static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, double* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_DOUBLE); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(double*)p = *(double*)v; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 16710d9555..264cde00ee 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -81,7 +81,7 @@ int32_t getJsonValueLen(const char* data) { } else if (tTagIsJson(data)) { // json string dataLen = ((STag*)(data))->len; } else { - ASSERT(0); + uError("Invalid data type:%d in Json", *data); } return dataLen; } @@ -801,7 +801,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd size_t rowSize = blockDataGetRowSize(pBlock); int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize); if (capacity <= 0) { - return TSDB_CODE_FAILED; + return terrno; } *stopIndex = startIndex + capacity - 1; @@ -835,7 +835,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd if (size > pageSize) { // pageSize must be able to hold one row *stopIndex = j - 1; if (*stopIndex < startIndex) { - return TSDB_CODE_FAILED; + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } return TSDB_CODE_SUCCESS; @@ -2060,7 +2060,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat } // todo disable it temporarily - // ASSERT(pColInfoData->info.type != 0); + // A S S E R T(pColInfoData->info.type != 0); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { pBlock->info.hasVarCol = true; } @@ -2100,14 +2100,18 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int int32_t payloadSize = pageSize - extraSize; int32_t rowSize = pBlock->info.rowSize; int32_t nRows = payloadSize / rowSize; - ASSERT(nRows >= 1); + if (nRows < 1) { + qError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return -1; + } int32_t numVarCols = 0; int32_t numFixCols = 0; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); if (pCol == NULL) { - return terrno; + return -1; } if (IS_VAR_DATA_TYPE(pCol->info.type)) { @@ -2135,7 +2139,11 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int int32_t newRows = (result != -1) ? result - 1 : nRows; // the true value must be less than the value of nRows - ASSERT(newRows <= nRows && newRows >= 1); + if (newRows > nRows || newRows < 1) { + uError("invalid newRows:%d, nRows:%d", newRows, nRows); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return -1; + } return newRows; } @@ -2616,7 +2624,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat } // the rsma result should has the same column number with schema. - ASSERT(colNum == pTSchema->numOfCols); + if (colNum != pTSchema->numOfCols) { + uError("colNum %d is not equal to numOfCols %d", colNum, pTSchema->numOfCols); + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + goto _end; + } SSubmitTbData tbData = {0}; @@ -2652,10 +2664,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: - ASSERT(pColInfoData->info.type == pCol->type); + if (pColInfoData->info.type != pCol->type) { + uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } if (!isStartKey) { isStartKey = true; - ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId); + if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) { + uError("the first timestamp colId %d is not primary colId", pCol->colId); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var})); void* px = taosArrayPush(pVals, &cv); if (px == NULL) { @@ -2679,7 +2699,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY - ASSERT(pColInfoData->info.type == pCol->type); + if (pColInfoData->info.type != pCol->type) { + uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); void* px = taosArrayPush(pVals, &cv); @@ -2704,7 +2728,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_MEDIUMBLOB: uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); - ASSERT(0); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; break; default: if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { @@ -2752,7 +2777,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat } } else { uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); - ASSERT(0); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; } break; } @@ -2763,7 +2789,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat goto _end; } - ASSERT(pRow); void* px = taosArrayPush(tbData.aRowP, &pRow); if (px == NULL) { code = terrno; @@ -2902,7 +2927,10 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { int32_t* rows = (int32_t*)data; *rows = pBlock->info.rows; data += sizeof(int32_t); - ASSERT(*rows > 0); + if (*rows <= 0) { + uError("Invalid rows %d in block", *rows); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } int32_t* cols = (int32_t*)data; *cols = numOfCols; @@ -3055,7 +3083,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos for (int32_t i = 0; i < numOfCols; ++i) { colLen[i] = htonl(colLen[i]); - ASSERT(colLen[i] >= 0); + if (colLen[i] < 0) { + uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); if (pColInfoData == NULL) { @@ -3099,7 +3131,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos pBlock->info.dataLoad = 1; pBlock->info.rows = numOfRows; pBlock->info.blankFill = blankFill; - ASSERT(pStart - pData == dataLen); + if (pStart - pData != dataLen) { + uError("block decode msg len error, pStart:%p, pData:%p, dataLen:%d", pStart, pData, dataLen); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } *pEndPos = pStart; return code; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index e35b26627b..95035dd96f 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -32,7 +32,6 @@ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ do { \ - assert(sizeof(_uid) == sizeof(uint64_t)); \ *(uint64_t*)(_k) = (_uid); \ (void)memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ } while (0) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 64db1a57a0..ceb0037b8d 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -357,7 +357,6 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \ do { \ - ASSERT(taosArrayGetSize((_cache)->grps) <= 1); \ SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve((_cache)->grps, 1); \ (_cache)->rowNum += (_blk)->info.rows; \ pGrp->blk = (_blk); \ @@ -381,7 +380,6 @@ typedef struct SMJoinOperatorInfo { do { \ SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \ if (NULL != pGrp) { \ - ASSERT(pGrp->blk == (_tb)->blk); \ pGrp->beginIdx = (_tb)->blkRowIdx; \ pGrp->readIdx = pGrp->beginIdx; \ } \ diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3f203e7a95..6a7bb7c6be 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1221,6 +1221,11 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo pInfo->rowCapacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf), blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock))); + if (pInfo->rowCapacity < 0) { + code = terrno; + goto _error; + } + pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno); diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index 55628c18f9..83716d72ad 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -183,7 +183,7 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) { setBufPageDirty(pFirst, true); setBufPageDirty(pLast, true); - // ASSERT(pLast->num >= nodeSize + sizeof(SFilePage)); + // A S S E R T(pLast->num >= nodeSize + sizeof(SFilePage)); pFirst->num += nodeSize; pLast->num -= nodeSize; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 621d643361..fa1ccc3100 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -970,6 +970,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock))); + if (numOfRows < 0) { + return terrno; + } + int32_t code = blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); if (code) { return code; @@ -1999,6 +2003,9 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* int32_t code = TSDB_CODE_SUCCESS; int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock); int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize); + if (rowCap < 0) { + return terrno; + } code = blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); if (code) { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index c1afc4afb3..7bddc068e3 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1089,8 +1089,6 @@ void nodesDestroyNode(SNode* pNode) { if (pStmt->destroyParseFileCxt) { pStmt->destroyParseFileCxt(&pStmt->pParFileCxt); } - - assert(TSDB_CODE_SUCCESS == taosCloseFile(&pStmt->fp)); break; } case QUERY_NODE_CREATE_DATABASE_STMT: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index d48871fd70..3aab190eda 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -6578,7 +6578,6 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode, void* pCtx) { return false; } - assert(pFuncs); FOREACH(pTmpNode, pFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pTmpNode; if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) && @@ -7271,7 +7270,6 @@ static int32_t tsmaOptRewriteParent(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParen if (code == TSDB_CODE_SUCCESS && pWindow) { SColumnNode* pCol = (SColumnNode*)pScan->pScanCols->pTail->pNode; - assert(pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID); nodesDestroyNode(pWindow->pTspk); pWindow->pTspk = NULL; code = nodesCloneNode((SNode*)pCol, &pWindow->pTspk); From 99102d47ac65dfa957da6adaba9489de1e6bdaed Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 29 Aug 2024 15:54:40 +0800 Subject: [PATCH 08/16] fix: compile issue --- source/common/src/tdatablock.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 264cde00ee..b21dd8c803 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2101,7 +2101,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int int32_t rowSize = pBlock->info.rowSize; int32_t nRows = payloadSize / rowSize; if (nRows < 1) { - qError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize); + uError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; return -1; } From 2084b677b129fa0809f66d01be9f1d113bad5d71 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 29 Aug 2024 17:24:34 +0800 Subject: [PATCH 09/16] fix:[TD-31242] add parameters check --- source/client/src/clientMain.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9b336d27f2..57faa4b739 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -860,12 +860,13 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { } int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){ - if (res == NULL || result == NULL || *rows <= 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || + columnIndex < 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { return TSDB_CODE_INVALID_PARA; } int32_t numOfFields = taos_num_fields(res); - if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) { + if (columnIndex >= numOfFields || numOfFields == 0) { return TSDB_CODE_INVALID_PARA; } From 28aeb5824ab2beb14aebc37e4c676aad3003cb97 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 29 Aug 2024 17:49:09 +0800 Subject: [PATCH 10/16] fix(lru/insert): free & unlock if alloc failed --- source/util/src/tlrucache.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 33e0076c6e..24b60e8d13 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -374,6 +374,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * LRUStatus status = TAOS_LRU_STATUS_OK; SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); if (!lastReferenceList) { + taosLRUEntryFree(e); return TAOS_LRU_STATUS_FAIL; } @@ -385,13 +386,12 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); if (handle == NULL) { if (!taosArrayPush(lastReferenceList, &e)) { - (void)taosThreadMutexUnlock(&shard->mutex); taosLRUEntryFree(e); - return status; + goto _exit; } } else { if (freeOnFail) { - taosMemoryFree(e); + taosLRUEntryFree(e); *handle = NULL; } @@ -410,9 +410,9 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * shard->usage -= old->totalCharge; if (!taosArrayPush(lastReferenceList, &old)) { - (void)taosThreadMutexUnlock(&shard->mutex); + taosLRUEntryFree(e); taosLRUEntryFree(old); - return status; + goto _exit; } } } @@ -427,6 +427,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * } } +_exit: (void)taosThreadMutexUnlock(&shard->mutex); for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { From 79688e32a3213294f3b9f2cded435bde9d56a446 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 29 Aug 2024 09:54:28 +0000 Subject: [PATCH 11/16] add debug info --- source/libs/stream/src/streamTask.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index afe315bf58..76db09731f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -310,6 +310,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { + stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId); if (pTask->pState != NULL) { stDebug("s-task:0x%x start to free task state", pTask->id.taskId); streamStateClose(pTask->pState, remove); @@ -319,8 +320,11 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { pTask->pBackend = NULL; pTask->pState = NULL; } else { + stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId, + pTask->backendPath ? pTask->backendPath : "NULL"); if (remove) { if (pTask->backendPath != NULL) { + stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath); taosRemoveDir(pTask->backendPath); } } @@ -393,6 +397,7 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) { } (void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id); + stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath); return 0; } From c633468ec7e7a343eadde1b99900c1445e634c08 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 29 Aug 2024 18:38:51 +0800 Subject: [PATCH 12/16] fix: encode error return code issue --- source/common/src/tdatablock.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b21dd8c803..8e50c943b9 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2930,6 +2930,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { if (*rows <= 0) { uError("Invalid rows %d in block", *rows); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return -1; } int32_t* cols = (int32_t*)data; From b4821224c6e1c224a33c9d5da83326b9d1a6e480 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 29 Aug 2024 11:28:59 +0000 Subject: [PATCH 13/16] force remve stream path --- source/libs/stream/src/streamTask.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 76db09731f..3b7b999dcb 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -377,11 +377,11 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) { int32_t taskId = 0; if (pTask->info.fillHistory) { - streamId = pTask->hTaskInfo.id.taskId; - taskId = pTask->hTaskInfo.id.taskId; - } else { - streamId = pTask->streamTaskId.taskId; + streamId = pTask->streamTaskId.streamId; taskId = pTask->streamTaskId.taskId; + } else { + streamId = pTask->id.streamId; + taskId = pTask->id.taskId; } char id[128] = {0}; From d43df785372f34beb49cf65943b5e9e3308684f9 Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Thu, 29 Aug 2024 19:43:59 +0800 Subject: [PATCH 14/16] fix: free operator --- source/libs/executor/inc/operator.h | 1 + source/libs/executor/src/aggregateoperator.c | 14 ++--- .../libs/executor/src/countwindowoperator.c | 10 +--- .../libs/executor/src/dynqueryctrloperator.c | 4 +- .../libs/executor/src/eventwindowoperator.c | 10 +--- source/libs/executor/src/exchangeoperator.c | 2 +- source/libs/executor/src/filloperator.c | 19 +++---- source/libs/executor/src/groupcacheoperator.c | 10 +--- source/libs/executor/src/groupoperator.c | 36 +++---------- source/libs/executor/src/hashjoinoperator.c | 5 +- source/libs/executor/src/mergejoinoperator.c | 4 +- source/libs/executor/src/mergeoperator.c | 5 +- source/libs/executor/src/operator.c | 21 ++++++++ source/libs/executor/src/projectoperator.c | 20 ++------ source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/sortoperator.c | 21 ++------ .../executor/src/streamcountwindowoperator.c | 10 +--- .../executor/src/streameventwindowoperator.c | 10 +--- source/libs/executor/src/streamfilloperator.c | 15 ++---- .../executor/src/streamtimewindowoperator.c | 45 ++++------------ source/libs/executor/src/timesliceoperator.c | 11 ++-- source/libs/executor/src/timewindowoperator.c | 51 ++++--------------- source/libs/scalar/src/filter.c | 2 +- 23 files changed, 94 insertions(+), 234 deletions(-) diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 7d09be3300..0df676c6e2 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -182,6 +182,7 @@ int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo); void destroyOperator(SOperatorInfo* pOperator); +void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** stream, int32_t num); int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index b5a3f2f484..5a32e4bac2 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -137,22 +137,14 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 1d72b0bb58..28b2c22053 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -378,20 +378,14 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyCountWindowOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index a75bfb8f4b..8058fa9afe 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -1006,14 +1006,14 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyDynQueryCtrlOperator(pInfo); } - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 0b5fd074b0..591590a261 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -140,20 +140,14 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyEWindowOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 4315624d97..120dcbc205 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -444,7 +444,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 4ae3226f48..246a5e2a6d 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -511,26 +511,21 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + if (pInfo != NULL) { destroyFillOperatorInfo(pInfo); } - + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } return code; } diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 9b213487ed..f0e0894bd2 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1510,20 +1510,14 @@ int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfD qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyGroupCacheOperator(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) { - destroyOperator(*pDownstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index f0a4914c24..38c2180b89 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -600,22 +600,12 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: + if (pInfo != NULL) destroyGroupOperatorInfo(pInfo); + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - if (pInfo != NULL) { - destroyGroupOperatorInfo(pInfo); - } - - if (pOperator) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } - return code; } @@ -1229,20 +1219,14 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyPartitionOperatorInfo(pInfo); } pTaskInfo->code = code; - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); TAOS_RETURN(code); } @@ -1778,18 +1762,12 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: pTaskInfo->code = code; if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; } diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 55620defba..f253aefe95 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -1228,15 +1228,14 @@ int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDow qDebug("create hash Join operator done"); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _return: if (pInfo != NULL) { destroyHashJoinOperator(pInfo); } - - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 808aac66c2..14f3a08e17 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1869,6 +1869,7 @@ int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDo SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); + int32_t oldNum = numOfDownstream; bool newDownstreams = false; int32_t code = TSDB_CODE_SUCCESS; SOperatorInfo* pOperator = NULL; @@ -1921,8 +1922,7 @@ _return: if (newDownstreams) { taosMemoryFree(pDownstream); } - - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); pTaskInfo->code = code; return code; diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 9e0ad5f497..c12bfd8798 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -649,14 +649,13 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyMultiwayMergeOperatorInfo(pInfo); } - pTaskInfo->code = code; - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, downStreams, numStreams); return code; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index fc52b97388..3f48d0f0a8 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -633,7 +633,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else { code = TSDB_CODE_INVALID_PARA; pTaskInfo->code = code; + for (int32_t i = 0; i < size; ++i) { + destroyOperator(ops[i]); + } taosMemoryFree(ops); + qError("invalid operator type %d", type); return code; } @@ -672,6 +676,23 @@ void destroyOperator(SOperatorInfo* pOperator) { taosMemoryFreeClear(pOperator); } +void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) { + if (downstreams != NULL) { + for (int i = 0; i < num; i++) { + destroyOperator(downstreams[i]); + } + } + + if (pOperator != NULL) { + pOperator->info = NULL; + if (pOperator->pDownstream != NULL) { + taosMemoryFreeClear(pOperator->pDownstream); + pOperator->pDownstream = NULL; + } + destroyOperator(pOperator); + } +} + int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { SExplainExecInfo execInfo = {0}; SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 7e06c083ed..a9ba57e1d4 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -177,17 +177,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyProjectOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -525,17 +519,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 557794a062..b63fe1198d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1528,7 +1528,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index fb4b61c7a8..6083cbdcf8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -155,20 +155,13 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroySortOperatorInfo(pInfo); } - - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -841,19 +834,13 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: pTaskInfo->code = code; if (pInfo != NULL) { destroyGroupSortOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); return code; } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 54ad12cff0..4f11afd35a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -920,20 +920,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyStreamCountAggOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index a8e14bce68..d7519d90e9 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -982,17 +982,11 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 3a6d0c709c..9a66f6d688 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1451,20 +1451,13 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); - } + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 0651e2dbf6..389266efb5 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2024,17 +2024,11 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -3858,20 +3852,13 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyStreamSessionAggOperatorInfo(pInfo); } - - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -4074,6 +4061,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys SOperatorInfo* pOperator = NULL; code = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, &pOperator); if (pOperator == NULL || code != 0) { + downstream = NULL; QUERY_CHECK_CODE(code, lino, _error); } @@ -4135,9 +4123,6 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -5035,17 +5020,11 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -5376,17 +5355,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 1b8e6709d1..d57a8c7c5b 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1192,22 +1192,17 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN // int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp); code = appendDownstream(pOperator, &downstream, 1); + QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 024e0393f0..5499fa3026 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1428,19 +1428,13 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyIntervalOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -1703,20 +1697,14 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyStateWindowOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -1805,17 +1793,11 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -2122,17 +2104,11 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -2462,19 +2438,12 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pMergeIntervalInfo != NULL) { destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); } - - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index cbc7ca77bb..f5712d135b 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -5221,7 +5221,7 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat)); } - return code; + return TSDB_CODE_SUCCESS; _return: From dd52295314804fb8101974eb92f9cf02a9befff6 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 29 Aug 2024 12:33:14 +0000 Subject: [PATCH 15/16] fix mem leak possible --- source/libs/transport/src/transCli.c | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b532372148..7b42952e5f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -253,20 +253,20 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) -#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ - do { \ - int i = 0, sz = transQueueSize(&conn->cliMsgs); \ - for (; i < sz; i++) { \ - pMsg = transQueueGet(&conn->cliMsgs, i); \ - if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ - break; \ - } \ - } \ - if (i == sz) { \ - pMsg = NULL; \ - } else { \ - pMsg = transQueueRm(&conn->cliMsgs, i); \ - } \ +#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ + do { \ + int i = 0, sz = transQueueSize(&conn->cliMsgs); \ + for (; i < sz; i++) { \ + pMsg = transQueueGet(&conn->cliMsgs, i); \ + if (pMsg->ctx != NULL && pMsg->msg.msgType != TDMT_SCH_DROP_TASK && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ + break; \ + } \ + } \ + if (i == sz) { \ + pMsg = NULL; \ + } else { \ + pMsg = transQueueRm(&conn->cliMsgs, i); \ + } \ } while (0) #define CONN_GET_NEXT_SENDMSG(conn) \ From 065c1435e8098392c960ea226abbdf9e87c87d15 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 29 Aug 2024 12:34:45 +0000 Subject: [PATCH 16/16] fix mem leak possible --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7b42952e5f..073d6c0f17 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -258,7 +258,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); int i = 0, sz = transQueueSize(&conn->cliMsgs); \ for (; i < sz; i++) { \ pMsg = transQueueGet(&conn->cliMsgs, i); \ - if (pMsg->ctx != NULL && pMsg->msg.msgType != TDMT_SCH_DROP_TASK && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ + if (pMsg->msg.msgType != TDMT_SCH_DROP_TASK && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ break; \ } \ } \