From e4a3d1dfcf188ed8ceecfa4745df4c5bb2ade3af Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 24 May 2023 18:26:02 +0800 Subject: [PATCH 01/22] fix: link rocksdb shared library --- cmake/cmake.options | 6 ++++++ cmake/rocksdb_CMakeLists.txt.in | 35 ++++++++++++++++++++++++--------- contrib/CMakeLists.txt | 2 ++ 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/cmake/cmake.options b/cmake/cmake.options index 3bc5e202a5..555b72cbdf 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -77,6 +77,12 @@ ELSEIF (TD_DARWIN_64) ENDIF () ENDIF () +option( + BUILD_GEOS + "If build geos on Windows" + ON + ) + option( BUILD_SHARED_LIBS "" diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index ba4a404af6..e100b3a932 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -1,11 +1,28 @@ # rocksdb -ExternalProject_Add(rocksdb - GIT_REPOSITORY https://github.com/facebook/rocksdb.git - GIT_TAG v8.1.1 - SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" - ) + +find_library(ROCKSDB_LIB NAMES rocksdb) +find_path(ROCKSDB_INCLUDE_DIR NAMES rocksdb/db.h) + +if (ROCKSDB_LIB AND ROCKSDB_INCLUDE_DIR) + set(ROCKSDB_FOUND TRUE) +endif() + +if(ROCKSDB_FOUND) + message(STATUS "Found rocksdb") +else() + message(STATUS "Building rocksdb from source") + + ExternalProject_Add(rocksdb + GIT_REPOSITORY https://github.com/facebook/rocksdb.git + GIT_TAG v8.1.1 + SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" + BINARY_DIR "${TD_CONTRIB_DIR}/rocksdb" + CONFIGURE_COMMAND "" + BUILD_COMMAND COMMAND $(MAKE) install-shared -j8 + INSTALL_COMMAND "" + TEST_COMMAND "" + ) +endif() + + diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index b3333dee99..e3a0f72959 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -226,6 +226,7 @@ if(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB}) # rocksdb +if (FALSE) # To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev if(${BUILD_WITH_ROCKSDB}) if(${TD_LINUX}) @@ -276,6 +277,7 @@ if(${BUILD_WITH_ROCKSDB}) PUBLIC $ ) endif(${BUILD_WITH_ROCKSDB}) +endif() # lucene # To support build on ubuntu: sudo apt-get install libboost-all-dev From 4d526ed0cdf40b7a22f15508768f3eb33c5863fa Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 24 May 2023 18:59:09 +0800 Subject: [PATCH 02/22] change more --- cmake/rocksdb_CMakeLists.txt.in | 36 +++++++++---------------------- contrib/CMakeLists.txt | 4 +--- source/dnode/vnode/CMakeLists.txt | 2 +- source/libs/stream/CMakeLists.txt | 2 +- 4 files changed, 13 insertions(+), 31 deletions(-) diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index e100b3a932..bb86cafb63 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -1,28 +1,12 @@ # rocksdb - -find_library(ROCKSDB_LIB NAMES rocksdb) -find_path(ROCKSDB_INCLUDE_DIR NAMES rocksdb/db.h) - -if (ROCKSDB_LIB AND ROCKSDB_INCLUDE_DIR) - set(ROCKSDB_FOUND TRUE) -endif() - -if(ROCKSDB_FOUND) - message(STATUS "Found rocksdb") -else() - message(STATUS "Building rocksdb from source") - - ExternalProject_Add(rocksdb - GIT_REPOSITORY https://github.com/facebook/rocksdb.git - GIT_TAG v8.1.1 - SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" - BINARY_DIR "${TD_CONTRIB_DIR}/rocksdb" - CONFIGURE_COMMAND "" - BUILD_COMMAND COMMAND $(MAKE) install-shared -j8 - INSTALL_COMMAND "" - TEST_COMMAND "" - ) -endif() - - +ExternalProject_Add(rocksdb + GIT_REPOSITORY https://github.com/facebook/rocksdb.git + GIT_TAG v8.1.1 + SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" + BINARY_DIR "${TD_CONTRIB_DIR}/rocksdb" + CONFIGURE_COMMAND "" + BUILD_COMMAND COMMAND $(MAKE) install-shared -j8 + INSTALL_COMMAND "" + TEST_COMMAND "" + ) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index e3a0f72959..ec48a85c33 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -226,7 +226,6 @@ if(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB}) # rocksdb -if (FALSE) # To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev if(${BUILD_WITH_ROCKSDB}) if(${TD_LINUX}) @@ -270,14 +269,13 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_TOOLS "" OFF) option(WITH_LIBURING "" OFF) - option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) + option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON) add_subdirectory(rocksdb EXCLUDE_FROM_ALL) target_include_directories( rocksdb PUBLIC $ ) endif(${BUILD_WITH_ROCKSDB}) -endif() # lucene # To support build on ubuntu: sudo apt-get install libboost-all-dev diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index e8660cd6ad..9b39516ab0 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -101,7 +101,7 @@ target_link_libraries( # PUBLIC bdb # PUBLIC scalar - PUBLIC rocksdb + PUBLIC rocksdb-shared PUBLIC transport PUBLIC stream PUBLIC index diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index 2edbc44aae..acf563e822 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -9,7 +9,7 @@ target_include_directories( if(${BUILD_WITH_ROCKSDB}) target_link_libraries( stream - PUBLIC rocksdb tdb + PUBLIC rocksdb-shared tdb PRIVATE os util transport qcom executor wal ) From b708a3c9dd35cabaf3d831dbb007f6d13d76c029 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 24 May 2023 18:59:54 +0800 Subject: [PATCH 03/22] more --- cmake/rocksdb_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index bb86cafb63..86d3ae946a 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -6,7 +6,7 @@ ExternalProject_Add(rocksdb SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" BINARY_DIR "${TD_CONTRIB_DIR}/rocksdb" CONFIGURE_COMMAND "" - BUILD_COMMAND COMMAND $(MAKE) install-shared -j8 + BUILD_COMMAND "" INSTALL_COMMAND "" TEST_COMMAND "" ) From e60ca238342fb1fbb68f3c169817330e980001f8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 24 May 2023 19:01:04 +0800 Subject: [PATCH 04/22] more --- cmake/rocksdb_CMakeLists.txt.in | 1 - 1 file changed, 1 deletion(-) diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index 86d3ae946a..ba4a404af6 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -4,7 +4,6 @@ ExternalProject_Add(rocksdb GIT_REPOSITORY https://github.com/facebook/rocksdb.git GIT_TAG v8.1.1 SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" - BINARY_DIR "${TD_CONTRIB_DIR}/rocksdb" CONFIGURE_COMMAND "" BUILD_COMMAND "" INSTALL_COMMAND "" From 2f90fda13d4b2f029699d69d482ffe2565927ade Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 25 May 2023 16:15:10 +0800 Subject: [PATCH 05/22] change parameter --- source/libs/stream/src/streamBackendRocksdb.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 16ba81c74a..f9a9bef960 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -86,10 +86,10 @@ void* streamBackendInit(const char* path) { rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_options_set_write_buffer_size(opts, 128 << 20); - rocksdb_options_set_max_total_wal_size(opts, 128 << 20); + rocksdb_options_set_write_buffer_size(opts, 64 << 20); + rocksdb_options_set_max_total_wal_size(opts, 64 << 20); rocksdb_options_set_recycle_log_file_num(opts, 6); - rocksdb_options_set_max_write_buffer_number(opts, 3); + rocksdb_options_set_max_write_buffer_number(opts, 2); rocksdb_options_set_info_log_level(opts, 0); pHandle->env = env; @@ -740,7 +740,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { qError("failed to open rocksdb cf, reason:%s", err); taosMemoryFree(err); } else { - qDebug("succ to open rocksdb cf, reason:%s", err); + qDebug("succ to open rocksdb cf, reason"); } pIter = taosHashIterate(ids, NULL); From 9cb4a114e2e0118c266eb068bfbd94f9c8cb59ab Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 25 May 2023 16:20:00 +0800 Subject: [PATCH 06/22] change parameter --- source/libs/stream/src/streamBackendRocksdb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f9a9bef960..f444dd01e3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -80,14 +80,14 @@ void* streamBackendInit(const char* path) { rocksdb_env_set_low_priority_background_threads(env, 4); rocksdb_env_set_high_priority_background_threads(env, 2); - rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); + rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20); rocksdb_options_t* opts = rocksdb_options_create(); rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); rocksdb_options_set_write_buffer_size(opts, 64 << 20); - rocksdb_options_set_max_total_wal_size(opts, 64 << 20); + rocksdb_options_set_max_total_wal_size(opts, 128 << 20); rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 2); rocksdb_options_set_info_log_level(opts, 0); From 5933f85b56461685cecdaade4e0b1d804f96829e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 25 May 2023 17:51:33 +0800 Subject: [PATCH 07/22] change parameter --- cmake/cmake.define | 6 +++--- contrib/CMakeLists.txt | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 0d5c21604a..a233834fbe 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE OFF) +set(CMAKE_VERBOSE_MAKEFILE ON) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory @@ -123,8 +123,8 @@ ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ELSE () - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -O3 -s -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ENDIF () # disable all assert diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index b3333dee99..842b74098a 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -231,6 +231,7 @@ if(${BUILD_WITH_ROCKSDB}) if(${TD_LINUX}) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result") endif(${TD_LINUX}) + MESSAGE(STATUS "CXXXX STATUS CONFIG: " ${CMAKE_CXX_FLAGS}) if(${TD_DARWIN}) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized") @@ -248,7 +249,7 @@ if(${BUILD_WITH_ROCKSDB}) endif(${TD_DARWIN}) if(${TD_WINDOWS}) - option(WITH_JNI "" ON) + option(WITH_JNI "" OFF) endif(${TD_WINDOWS}) if(${TD_WINDOWS}) @@ -260,7 +261,7 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_FALLOCATE "" OFF) option(WITH_JEMALLOC "" OFF) option(WITH_GFLAGS "" OFF) - option(PORTABLE "" ON) + option(PORTABLE "" OFF) option(WITH_LIBURING "" OFF) option(FAIL_ON_WARNINGS OFF) From cd44bd9f3f3740c8f37f968ee9955a2383c72f51 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 25 May 2023 18:16:33 +0800 Subject: [PATCH 08/22] change parameter --- source/libs/stream/src/streamBackendRocksdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f444dd01e3..11e33da5a4 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -86,7 +86,7 @@ void* streamBackendInit(const char* path) { rocksdb_options_set_env(opts, env); rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_options_set_write_buffer_size(opts, 64 << 20); + rocksdb_options_set_write_buffer_size(opts, 48 << 20); rocksdb_options_set_max_total_wal_size(opts, 128 << 20); rocksdb_options_set_recycle_log_file_num(opts, 6); rocksdb_options_set_max_write_buffer_number(opts, 2); From 52c384c7c9db31776dc5c449d2e3c6313e1a1e57 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 26 May 2023 09:06:19 +0800 Subject: [PATCH 09/22] change parameter --- source/libs/stream/src/streamBackendRocksdb.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 11e33da5a4..c530171eb2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -675,7 +675,6 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) { rocksdb_readoptions_destroy(inst->rOpt); taosMemoryFree(inst->cfOpt); - taosMemoryFree(inst->param); taosMemoryFreeClear(inst->param); taosMemoryFree(inst); } From 34d21bc03fc14d959cc869690f1441ad0b14d4f7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 26 May 2023 20:11:33 +0800 Subject: [PATCH 10/22] change parameter --- cmake/cmake.define | 2 +- include/libs/stream/streamState.h | 3 +- source/libs/stream/src/streamBackendRocksdb.c | 260 +++++++++--------- 3 files changed, 129 insertions(+), 136 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index a233834fbe..35abe512f7 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.0) -set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_VERBOSE_MAKEFILE OFF) set(TD_BUILD_TAOSA_INTERNAL FALSE) #set output directory diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 1cc61ec072..66e8b7b2a3 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -44,9 +44,10 @@ typedef struct STdbState { void* param; void* env; SListNode* pComparNode; - void* pBackendHandle; + void* pBackend; char idstr[64]; void* compactFactory; + TdThreadRwlock rwLock; TDB* db; TTB* pStateDb; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c530171eb2..37dd7f6297 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -22,6 +22,9 @@ typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; +typedef struct { + void* tableOpt; +} RocksdbCfParam; typedef struct { rocksdb_t* db; rocksdb_column_family_handle_t** pHandle; @@ -29,12 +32,13 @@ typedef struct { rocksdb_readoptions_t* rOpt; rocksdb_options_t** cfOpt; rocksdb_options_t* dbOpt; - void* param; - void* pBackendHandle; + RocksdbCfParam* param; + void* pBackend; SListNode* pCompareNode; + rocksdb_comparator_t** pCompares; } RocksdbCfInst; -int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids); +int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); void destroyRocksdbCfInst(RocksdbCfInst* inst); @@ -46,9 +50,6 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c char** newval, size_t* newvlen, unsigned char* value_changed); rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx); -typedef struct { - void* tableOpt; -} RocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; typedef int (*EncodeFunc)(void* key, char* buf); @@ -114,25 +115,7 @@ void* streamBackendInit(const char* path) { /* list all cf and get prefix */ - int64_t streamId; - int32_t taskId, dummpy = 0; - SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - for (size_t i = 0; i < nCf; i++) { - char* cf = cfs[i]; - char suffix[64] = {0}; - if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, suffix)) { - char idstr[128] = {0}; - sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId); - // qError("make cf name %s", idstr); - if (taosHashGet(tbl, idstr, strlen(idstr) + 1) == NULL) { - taosHashPut(tbl, idstr, strlen(idstr) + 1, &dummpy, sizeof(dummpy)); - } - } else { - continue; - } - } - streamStateOpenBackendCf(pHandle, (char*)path, tbl); - taosHashCleanup(tbl); + streamStateOpenBackendCf(pHandle, (char*)path, cfs, nCf); } rocksdb_list_column_families_destroy(cfs, nCf); @@ -209,7 +192,7 @@ void streamBackendDelCompare(void* backend, void* arg) { } void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); -int streamGetInit(const char* funcName); +int streamGetInit(SStreamState* pState, const char* funcName); // |key|-----value------| // |key|ttl|len|userData| @@ -679,61 +662,44 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) { taosMemoryFree(inst); } -int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { +int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) { SBackendHandle* handle = backend; char* err = NULL; - size_t nSize = taosHashGetSize(ids); - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + int64_t streamId; + int32_t taskId, dummy = 0; + char suffix[64] = {0}; + SHashObj* instTbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - char** cfNames = taosMemoryCalloc(nSize * cfLen + 1, sizeof(char*)); - void* pIter = taosHashIterate(ids, NULL); - size_t keyLen = 0; - char* idstr = taosHashGetKey(pIter, &keyLen); - for (int i = 0; i < nSize * cfLen + 1; i++) { - cfNames[i] = (char*)taosMemoryCalloc(1, 128); - if (i == 0) { - memcpy(cfNames[0], "default", strlen("default")); - continue; - } + rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); + RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*)); + rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**)); + rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); - GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key); - if (i % cfLen == 0) { - pIter = taosHashIterate(ids, pIter); - if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); - } - } - rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*)); - RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*)); - for (int i = 0; i < nSize * cfLen + 1; i++) { + for (int i = 0; i < nCf; i++) { + char* cf = cfs[i]; + char funcname[64] = {0}; cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt); - if (i == 0) { - continue; + if (i == 0) continue; + if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { + rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); + rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); + + rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); + rocksdb_block_based_options_set_filter_policy(tableOpt, filter); + + rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt); + params[i].tableOpt = tableOpt; + + int idx = streamGetInit(NULL, funcname); + SCfInit* cfPara = &ginitDict[idx]; + + rocksdb_comparator_t* compare = + rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); + rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare); + pCompare[i] = compare; } - // refactor later - rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); - rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); - - rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); - rocksdb_block_based_options_set_filter_policy(tableOpt, filter); - - rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt); - params[i].tableOpt = tableOpt; - }; - - rocksdb_comparator_t** pCompare = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_comparator_t**)); - for (int i = 0; i < nSize * cfLen + 1; i++) { - if (i == 0) { - continue; - } - SCfInit* cf = &ginitDict[(i - 1) % cfLen]; - - rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName); - rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare); - pCompare[i] = compare; } - rocksdb_column_family_handle_t** cfHandle = - taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_column_family_handle_t*)); - rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nSize * cfLen + 1, (const char* const*)cfNames, + rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nCf, (const char* const*)cfs, (const rocksdb_options_t* const*)cfOpts, cfHandle, &err); if (err != NULL) { qError("failed to open rocksdb cf, reason:%s", err); @@ -742,50 +708,53 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) { qDebug("succ to open rocksdb cf, reason"); } - pIter = taosHashIterate(ids, NULL); - idstr = taosHashGetKey(pIter, &keyLen); - for (int i = 0; i < nSize; i++) { - RocksdbCfInst* inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst)); - rocksdb_column_family_handle_t** subCf = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); - rocksdb_comparator_t** subCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); - RocksdbCfParam* subParam = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); - rocksdb_options_t** subOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); - for (int j = 0; j < cfLen; j++) { - subCf[j] = cfHandle[i * cfLen + j + 1]; - subCompare[j] = pCompare[i * cfLen + j + 1]; - subParam[j] = params[i * cfLen + j + 1]; - subOpt[j] = cfOpts[i * cfLen + j + 1]; + static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + for (int i = 0; i < nCf; i++) { + char* cf = cfs[i]; + if (i == 0) continue; + char funcname[64] = {0}; + if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) { + char idstr[128] = {0}; + sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId); + + int idx = streamGetInit(NULL, funcname); + + RocksdbCfInst* inst = NULL; + RocksdbCfInst** pInst = taosHashGet(instTbl, idstr, strlen(idstr) + 1); + if (pInst == NULL || *pInst == NULL) { + inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst)); + inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); + inst->cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); + inst->wOpt = rocksdb_writeoptions_create(); + inst->rOpt = rocksdb_readoptions_create(); + inst->param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); + inst->pBackend = handle; + inst->db = db; + inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); + + inst->dbOpt = handle->dbOpt; + rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); + } else { + inst = *pInst; + } + inst->cfOpt[idx] = cfOpts[i]; + inst->pCompares[idx] = pCompare[i]; + memcpy(&(inst->param[idx]), &(params[i]), sizeof(RocksdbCfParam)); + inst->pHandle[idx] = cfHandle[i]; } - inst->db = db; - inst->pHandle = subCf; - inst->wOpt = rocksdb_writeoptions_create(); - inst->rOpt = rocksdb_readoptions_create(); - inst->cfOpt = (rocksdb_options_t**)subOpt; - inst->dbOpt = handle->dbOpt; - inst->param = subParam; - inst->pBackendHandle = handle; - handle->db = db; - SCfComparator compare = {.comp = subCompare, .numOfComp = cfLen}; + } + void** pIter = taosHashIterate(handle->cfInst, NULL); + while (*pIter) { + RocksdbCfInst* inst = *pIter; + SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen}; inst->pCompareNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); - - taosHashPut(handle->cfInst, idstr, keyLen, &inst, sizeof(void*)); - - pIter = taosHashIterate(ids, pIter); - if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen); } - rocksdb_column_family_handle_destroy(cfHandle[0]); - rocksdb_options_destroy(cfOpts[0]); - for (int i = 0; i < nSize * cfLen + 1; i++) { - taosMemoryFree(cfNames[i]); - } - taosMemoryFree(cfNames); taosMemoryFree(cfHandle); taosMemoryFree(pCompare); taosMemoryFree(params); taosMemoryFree(cfOpts); - return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { @@ -804,12 +773,13 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { pState->pTdbState->cfOpts = inst->cfOpt; pState->pTdbState->dbOpt = handle->dbOpt; pState->pTdbState->param = inst->param; - pState->pTdbState->pBackendHandle = handle; + pState->pTdbState->pBackend = handle; pState->pTdbState->pComparNode = inst->pCompareNode; taosThreadMutexUnlock(&handle->cfMutex); return 0; } taosThreadMutexUnlock(&handle->cfMutex); + return 0; char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -839,15 +809,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { pCompare[i] = compare; } rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); - for (int i = 0; i < cfLen; i++) { - char buf[128] = {0}; - GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key); - cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err); - if (err != NULL) { - qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); - taosMemoryFreeClear(err); - } - } pState->pTdbState->rocksdb = handle->db; pState->pTdbState->pHandle = cfHandle; pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); @@ -855,8 +816,9 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt; pState->pTdbState->dbOpt = handle->dbOpt; pState->pTdbState->param = param; - pState->pTdbState->pBackendHandle = handle; + pState->pTdbState->pBackend = handle; + taosThreadRwlockInit(&pState->pTdbState->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); // rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); @@ -865,7 +827,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { } void streamStateCloseBackend(SStreamState* pState, bool remove) { - SBackendHandle* pHandle = pState->pTdbState->pBackendHandle; + SBackendHandle* pHandle = pState->pTdbState->pBackend; taosThreadMutexLock(&pHandle->cfMutex); RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { @@ -887,7 +849,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { char* err = NULL; if (remove) { for (int i = 0; i < cfLen; i++) { - rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err); + if (pState->pTdbState->pHandle[i] != NULL) + rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err); if (err != NULL) { qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); @@ -896,7 +859,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { } else { rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); for (int i = 0; i < cfLen; i++) { - rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err); + if (pState->pTdbState->pHandle[i] != NULL) + rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err); if (err != NULL) { qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); @@ -906,7 +870,9 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { } for (int i = 0; i < cfLen; i++) { - rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); + if (pState->pTdbState->pHandle[i] != NULL) { + rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); + } } taosMemoryFreeClear(pState->pTdbState->pHandle); for (int i = 0; i < cfLen; i++) { @@ -915,7 +881,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { } if (remove) { - streamBackendDelCompare(pState->pTdbState->pBackendHandle, pState->pTdbState->pComparNode); + streamBackendDelCompare(pState->pTdbState->pBackend, pState->pTdbState->pComparNode); } rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts); pState->pTdbState->writeOpts = NULL; @@ -924,6 +890,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { pState->pTdbState->readOpts = NULL; taosMemoryFreeClear(pState->pTdbState->cfOpts); taosMemoryFreeClear(pState->pTdbState->param); + + taosThreadRwlockDestroy(&pState->pTdbState->rwLock); pState->pTdbState->rocksdb = NULL; } void streamStateDestroyCompar(void* arg) { @@ -934,14 +902,38 @@ void streamStateDestroyCompar(void* arg) { taosMemoryFree(comp->comp); } -int streamGetInit(const char* funcName) { +int streamGetInit(SStreamState* pState, const char* funcName) { + int idx = -1; size_t len = strlen(funcName); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) { - return i; + idx = i; + break; } } - return -1; + if (idx != -1) { + rocksdb_column_family_handle_t* cf = NULL; + taosThreadRwlockRdlock(&pState->pTdbState->rwLock); + cf = pState->pTdbState->pHandle[idx]; + taosThreadRwlockUnlock(&pState->pTdbState->rwLock); + if (cf == NULL) { + char buf[128] = {0}; + GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key); + char* err = NULL; + cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err); + if (err != NULL) { + idx = -1; + qError("failed to to open cf, %p 0x%" PRIx64 "-%d_%s, reason:%s", pState, pState->streamId, pState->taskId, + funcName, err); + taosMemoryFree(err); + } + taosThreadRwlockWrlock(&pState->pTdbState->rwLock); + pState->pTdbState->pHandle[idx] = cf; + taosThreadRwlockUnlock(&pState->pTdbState->rwLock); + } + } + + return idx; } bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) { rocksdb_iter_seek(iter, buf, len); @@ -955,7 +947,7 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len } rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt) { - int idx = streamGetInit(cfName); + int idx = streamGetInit(pState, cfName); if (snapshot != NULL) { *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); @@ -974,7 +966,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ - int i = streamGetInit(funcname); \ + int i = streamGetInit(pState, funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ @@ -1004,7 +996,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ - int i = streamGetInit(funcname); \ + int i = streamGetInit(pState, funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ @@ -1051,7 +1043,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = 0; \ char buf[128] = {0}; \ char* err = NULL; \ - int i = streamGetInit(funcname); \ + int i = streamGetInit(pState, funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ code = -1; \ @@ -1979,7 +1971,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { - int i = streamGetInit(cfName); + int i = streamGetInit(pState, cfName); if (i < 0) { qError("streamState failed to put to cf name:%s", cfName); From 57a4e56a616f8714460620c02fb55fd9fb07344e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 26 May 2023 20:30:03 +0800 Subject: [PATCH 11/22] change parameter --- source/libs/stream/src/streamBackendRocksdb.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 37dd7f6297..c9538f0b2a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -779,7 +779,6 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { return 0; } taosThreadMutexUnlock(&handle->cfMutex); - return 0; char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); From 479a1172663105eceff695010910555f79da2e89 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 26 May 2023 13:03:47 +0000 Subject: [PATCH 12/22] Merge branch 'enh/dev3.0' into enh/addCompileError --- include/libs/function/function.h | 65 ++++++++++++++++---------------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index de2494ab3c..e015f4182e 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -21,8 +21,8 @@ extern "C" { #endif #include "tcommon.h" -#include "tvariant.h" #include "tsimplehash.h" +#include "tvariant.h" struct SqlFunctionCtx; struct SResultRowEntryInfo; @@ -77,7 +77,7 @@ enum { enum { MAIN_SCAN = 0x0u, REVERSE_SCAN = 0x1u, // todo remove it - PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan + PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan }; typedef struct SPoint1 { @@ -130,43 +130,44 @@ typedef struct SSerializeDataHandle { // incremental state storage typedef struct STdbState { - void* rocksdb; - void** pHandle; - void* writeOpts; - void* readOpts; - void** cfOpts; - void* dbOpt; - struct SStreamTask* pOwner; - void* param; - void* env; - SListNode* pComparNode; - void* pBackendHandle; + void *rocksdb; + void **pHandle; + void *writeOpts; + void *readOpts; + void **cfOpts; + void *dbOpt; + struct SStreamTask *pOwner; + void *param; + void *env; + SListNode *pComparNode; + void *pBackend; char idstr[64]; - void* compactFactory; + void *compactFactory; + TdThreadRwlock rwLock; - void* db; - void* pStateDb; - void* pFuncStateDb; - void* pFillStateDb; // todo refactor - void* pSessionStateDb; - void* pParNameDb; - void* pParTagDb; - void* txn; + void *db; + void *pStateDb; + void *pFuncStateDb; + void *pFillStateDb; // todo refactor + void *pSessionStateDb; + void *pParNameDb; + void *pParTagDb; + void *txn; } STdbState; typedef struct { - STdbState* pTdbState; - struct SStreamFileState* pFileState; - int32_t number; - SSHashObj* parNameMap; - int64_t checkPointId; - int32_t taskId; - int64_t streamId; + STdbState *pTdbState; + struct SStreamFileState *pFileState; + int32_t number; + SSHashObj *parNameMap; + int64_t checkPointId; + int32_t taskId; + int64_t streamId; } SStreamState; typedef struct SFunctionStateStore { - int32_t (*streamStateFuncPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); - int32_t (*streamStateFuncGet)(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen); + int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen); + int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen); } SFunctionStateStore; // sql function runtime context @@ -180,7 +181,7 @@ typedef struct SqlFunctionCtx { int16_t functionId; // function id char *pOutput; // final result output buffer, point to sdata->data // input parameter, e.g., top(k, 20), the number of results of top query is kept in param - SFunctParam *param; + SFunctParam *param; // corresponding output buffer for timestamp of each result, e.g., diff/csum SColumnInfoData *pTsOutput; int32_t numOfParams; From 4b931d3007275100e30d2425bb8692a3b5017898 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 26 May 2023 13:59:16 +0000 Subject: [PATCH 13/22] Merge branch 'enh/dev3.0' into enh/addCompileError --- cmake/cmake.define | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 35abe512f7..f3caf49da3 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -124,7 +124,7 @@ ELSE () SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -O3 -s -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ENDIF () # disable all assert diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 326c081a5b..437aa14324 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -807,7 +807,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare); pCompare[i] = compare; } - rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); + rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(1, cfLen * sizeof(rocksdb_column_family_handle_t*)); pState->pTdbState->rocksdb = handle->db; pState->pTdbState->pHandle = (void**)cfHandle; pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); From 2f5fe2dec6a8056d8b41bc89fbc1ed5ed9d8682a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 26 May 2023 14:01:05 +0000 Subject: [PATCH 14/22] Merge branch 'enh/dev3.0' into enh/addCompileError --- source/libs/stream/src/streamBackendRocksdb.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 437aa14324..92ef66cd30 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1104,8 +1104,10 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { } char* err = NULL; - rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], - sKeyStr, sLen, eKeyStr, eLen, &err); + if (pState->pTdbState->pHandle[1] != NULL) { + rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], + sKeyStr, sLen, eKeyStr, eLen, &err); + } // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, // eLen); if (err != NULL) { From de739ce086d1388a7adfc18034760584673ed425 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 May 2023 01:11:13 +0000 Subject: [PATCH 15/22] Merge branch 'enh/dev3.0' into enh/addCompileError --- source/libs/stream/src/streamBackendRocksdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 92ef66cd30..b1a6eba42f 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -910,7 +910,7 @@ int streamGetInit(SStreamState* pState, const char* funcName) { break; } } - if (idx != -1) { + if (pState != NULL && idx != -1) { rocksdb_column_family_handle_t* cf = NULL; taosThreadRwlockRdlock(&pState->pTdbState->rwLock); cf = pState->pTdbState->pHandle[idx]; From 0be1858b4be796550bbd05ef7054a90b4edcacc1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 May 2023 01:50:43 +0000 Subject: [PATCH 16/22] Merge branch 'enh/dev3.0' into enh/addCompileError --- source/libs/stream/src/streamBackendRocksdb.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b1a6eba42f..793575f59a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -749,6 +749,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t RocksdbCfInst* inst = *pIter; SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen}; inst->pCompareNode = streamBackendAddCompare(handle, &compare); + pIter = taosHashIterate(handle->cfInst, pIter); } taosMemoryFree(cfHandle); From 342ce14c7daf079ed1a4a2bf46e074ba3452f84d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 May 2023 02:37:47 +0000 Subject: [PATCH 17/22] Merge branch 'enh/3.0' into fix/shared_link_rocksdb --- cmake/cmake.define | 4 ++-- contrib/CMakeLists.txt | 5 ++++- source/dnode/vnode/CMakeLists.txt | 23 +++++++++++++++++++++++ source/libs/stream/CMakeLists.txt | 10 ++++++++++ 4 files changed, 39 insertions(+), 3 deletions(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 0d5c21604a..2e55b954b7 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -123,8 +123,8 @@ ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ELSE () - SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3-Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ENDIF () # disable all assert diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index ec48a85c33..3c0c50b73b 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -268,8 +268,11 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_BENCHMARK_TOOLS "" OFF) option(WITH_TOOLS "" OFF) option(WITH_LIBURING "" OFF) - + IF (TD_LINUX) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON) + ELSE() + option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) + ENDIF() add_subdirectory(rocksdb EXCLUDE_FROM_ALL) target_include_directories( rocksdb diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 5c752ba265..b18cb8e282 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -87,6 +87,7 @@ target_include_directories( PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" ) +IF (TD_LINUX) target_link_libraries( vnode PUBLIC os @@ -107,6 +108,28 @@ target_link_libraries( PUBLIC stream PUBLIC index ) +ELSE() +target_link_libraries( + vnode + PUBLIC os + PUBLIC util + PUBLIC common + PUBLIC tfs + PUBLIC wal + PUBLIC qworker + PUBLIC sync + PUBLIC executor + PUBLIC scheduler + PUBLIC tdb + + # PUBLIC bdb + # PUBLIC scalar + PUBLIC rocksdb + PUBLIC transport + PUBLIC stream + PUBLIC index +) +ENDIF() IF (TD_GRANT) TARGET_LINK_LIBRARIES(vnode PUBLIC grant) diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index 72517a88da..fa6c709c8f 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -6,12 +6,22 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) + if(${BUILD_WITH_ROCKSDB}) + IF (TD_LINUX) target_link_libraries( stream PUBLIC rocksdb-shared tdb PRIVATE os util transport qcom executor wal index ) + ELSE() + target_link_libraries( + stream + PUBLIC rocksdb tdb + PRIVATE os util transport qcom executor wal index + ) + + ENDIF() target_include_directories( stream From d0b9abcc7bf59f282e7b35b5b27306c8fa0cf5d5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 May 2023 10:46:53 +0800 Subject: [PATCH 18/22] merge 3.0 --- cmake/cmake.define | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cmake.define b/cmake/cmake.define index 2e55b954b7..f3caf49da3 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -124,7 +124,7 @@ ELSE () SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ELSE () SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") - SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3-Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k") ENDIF () # disable all assert From f9d867f50d46cb52921e9bbc79960596af439197 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Mon, 29 May 2023 13:20:53 +0800 Subject: [PATCH 19/22] fix: TDengine build on macOS arm64 --- cmake/cmake.platform | 12 ++++++++++-- contrib/CMakeLists.txt | 4 ++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/cmake/cmake.platform b/cmake/cmake.platform index cb09bf2085..f9faf7316c 100644 --- a/cmake/cmake.platform +++ b/cmake/cmake.platform @@ -56,9 +56,17 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin SET(TD_DARWIN TRUE) SET(OSTYPE "macOS") + execute_process(COMMAND geos-config --cflags OUTPUT_VARIABLE GEOS_CFLAGS) + execute_process(COMMAND geos-config --ldflags OUTPUT_VARIABLE GEOS_LDFLAGS) + string(SUBSTRING ${GEOS_CFLAGS} 2 -1 GEOS_CFLAGS) + string(REGEX REPLACE "\n" "" GEOS_CFLAGS ${GEOS_CFLAGS}) + string(SUBSTRING ${GEOS_LDFLAGS} 2 -1 GEOS_LDFLAGS) + string(REGEX REPLACE "\n" "" GEOS_LDFLAGS ${GEOS_LDFLAGS}) + MESSAGE("GEOS_CFLAGS "${GEOS_CFLAGS}) + MESSAGE("GEOS_LDFLAGS "${GEOS_LDFLAGS}) ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare") - INCLUDE_DIRECTORIES(/usr/local/include) - LINK_DIRECTORIES(/usr/local/lib) + INCLUDE_DIRECTORIES(${GEOS_CFLAGS}) + LINK_DIRECTORIES(${GEOS_LDFLAGS}) IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64") MESSAGE("Current system arch is arm64") diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index b3333dee99..eeb20ee884 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -236,6 +236,10 @@ if(${BUILD_WITH_ROCKSDB}) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized") endif(${TD_DARWIN}) + if (${TD_DARWIN_ARM64}) + set(HAS_ARMV8_CRC true) + endif(${TD_DARWIN_ARM64}) + if (${TD_WINDOWS}) SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4244 /wd4819") endif(${TD_WINDOWS}) From 26cbcf996cd3032cdd4d579f1e664643c100bb95 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 May 2023 07:31:14 +0000 Subject: [PATCH 20/22] Merge branch 'enh/3.0' into fix/shared_link_rocksdb --- source/libs/stream/src/streamBackendRocksdb.c | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 793575f59a..e707803d9d 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -747,7 +747,30 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t void** pIter = taosHashIterate(handle->cfInst, NULL); while (*pIter) { RocksdbCfInst* inst = *pIter; - SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen}; + + for (int i = 0; i < cfLen; i++) { + if (inst->cfOpt[i] == NULL) { + rocksdb_options_t* opt = rocksdb_options_create_copy(handle->dbOpt); + rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); + rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); + + rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15); + rocksdb_block_based_options_set_filter_policy(tableOpt, filter); + + rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt); + + SCfInit* cfPara = &ginitDict[i]; + + rocksdb_comparator_t* compare = + rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); + rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); + + inst->pCompares[i] = compare; + inst->cfOpt[i] = opt; + inst->param[i].tableOpt = tableOpt; + } + } + SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen}; inst->pCompareNode = streamBackendAddCompare(handle, &compare); pIter = taosHashIterate(handle->cfInst, pIter); } @@ -897,7 +920,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { void streamStateDestroyCompar(void* arg) { SCfComparator* comp = (SCfComparator*)arg; for (int i = 0; i < comp->numOfComp; i++) { - rocksdb_comparator_destroy(comp->comp[i]); + if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]); } taosMemoryFree(comp->comp); } @@ -920,6 +943,8 @@ int streamGetInit(SStreamState* pState, const char* funcName) { char buf[128] = {0}; GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key); char* err = NULL; + + taosThreadRwlockWrlock(&pState->pTdbState->rwLock); cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err); if (err != NULL) { idx = -1; @@ -927,7 +952,6 @@ int streamGetInit(SStreamState* pState, const char* funcName) { funcName, err); taosMemoryFree(err); } - taosThreadRwlockWrlock(&pState->pTdbState->rwLock); pState->pTdbState->pHandle[idx] = cf; taosThreadRwlockUnlock(&pState->pTdbState->rwLock); } From 7977490b81d25dcfec8fc1e5a71aad37ecc78096 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 May 2023 08:42:52 +0000 Subject: [PATCH 21/22] delete geos opt --- cmake/cmake.options | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cmake.options b/cmake/cmake.options index 555b72cbdf..b00ae14715 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -80,7 +80,7 @@ ENDIF () option( BUILD_GEOS "If build geos on Windows" - ON + OFF ) option( From e00f79367dfe02ad70b2ec4705c7d40b6e0ab18e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 29 May 2023 18:57:21 +0800 Subject: [PATCH 22/22] merge 3.0 --- source/libs/stream/src/streamBackendRocksdb.c | 72 +++++++++++-------- 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index e707803d9d..f7638a42ae 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -142,16 +142,17 @@ void streamBackendCleanup(void* arg) { } taosHashCleanup(pHandle->cfInst); - rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - char* err = NULL; - rocksdb_flush(pHandle->db, flushOpt, &err); - if (err != NULL) { - qError("failed to flush db before streamBackend clean up, reason:%s", err); - taosMemoryFree(err); + if (pHandle->db) { + char* err = NULL; + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flush(pHandle->db, flushOpt, &err); + if (err != NULL) { + qError("failed to flush db before streamBackend clean up, reason:%s", err); + taosMemoryFree(err); + } + rocksdb_flushoptions_destroy(flushOpt); + rocksdb_close(pHandle->db); } - rocksdb_flushoptions_destroy(flushOpt); - - rocksdb_close(pHandle->db); rocksdb_options_destroy(pHandle->dbOpt); rocksdb_env_destroy(pHandle->env); rocksdb_cache_destroy(pHandle->cache); @@ -650,7 +651,7 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c void destroyRocksdbCfInst(RocksdbCfInst* inst) { int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); for (int i = 0; i < cfLen; i++) { - rocksdb_column_family_handle_destroy(inst->pHandle[i]); + if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]); } rocksdb_writeoptions_destroy(inst->wOpt); @@ -668,7 +669,6 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t int64_t streamId; int32_t taskId, dummy = 0; char suffix[64] = {0}; - SHashObj* instTbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*)); @@ -705,8 +705,12 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t qError("failed to open rocksdb cf, reason:%s", err); taosMemoryFree(err); } else { - qDebug("succ to open rocksdb cf, reason"); + qDebug("succ to open rocksdb cf"); } + // close default cf + rocksdb_column_family_handle_destroy(cfHandle[0]); + rocksdb_options_destroy(cfOpts[0]); + handle->db = db; static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); for (int i = 0; i < nCf; i++) { @@ -720,7 +724,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t int idx = streamGetInit(NULL, funcname); RocksdbCfInst* inst = NULL; - RocksdbCfInst** pInst = taosHashGet(instTbl, idstr, strlen(idstr) + 1); + RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1); if (pInst == NULL || *pInst == NULL) { inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst)); inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); @@ -745,7 +749,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t } } void** pIter = taosHashIterate(handle->cfInst, NULL); - while (*pIter) { + while (pIter) { RocksdbCfInst* inst = *pIter; for (int i = 0; i < cfLen; i++) { @@ -831,7 +835,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare); pCompare[i] = compare; } - rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(1, cfLen * sizeof(rocksdb_column_family_handle_t*)); + rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); pState->pTdbState->rocksdb = handle->db; pState->pTdbState->pHandle = (void**)cfHandle; pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); @@ -873,7 +877,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { if (remove) { for (int i = 0; i < cfLen; i++) { if (pState->pTdbState->pHandle[i] != NULL) - rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err); + rocksdb_drop_column_family(pState->pTdbState->rocksdb, + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[i], &err); if (err != NULL) { qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); taosMemoryFreeClear(err); @@ -951,8 +956,9 @@ int streamGetInit(SStreamState* pState, const char* funcName) { qError("failed to to open cf, %p 0x%" PRIx64 "-%d_%s, reason:%s", pState, pState->streamId, pState->taskId, funcName, err); taosMemoryFree(err); + } else { + pState->pTdbState->pHandle[idx] = cf; } - pState->pTdbState->pHandle[idx] = cf; taosThreadRwlockUnlock(&pState->pTdbState->rwLock); } } @@ -982,7 +988,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_set_snapshot(rOpt, *snapshot); rocksdb_readoptions_set_fill_cache(rOpt, 0); - return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]); + return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]); } #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ @@ -999,11 +1006,12 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_column_family_handle_t* pHandle = \ + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ if (err != NULL) { \ taosMemoryFree(err); \ @@ -1029,11 +1037,12 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + rocksdb_column_family_handle_t* pHandle = \ + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ if (val == NULL) { \ if (err == NULL) { \ qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ @@ -1076,9 +1085,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + rocksdb_column_family_handle_t* pHandle = \ + ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ if (err != NULL) { \ qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \