Merge pull request #21472 from taosdata/enh/addCompileError
change parameter
This commit is contained in:
commit
20a0054c4a
|
@ -123,8 +123,8 @@ ELSE ()
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
||||||
ELSE ()
|
ELSE ()
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
# disable all assert
|
# disable all assert
|
||||||
|
|
|
@ -77,6 +77,12 @@ ELSEIF (TD_DARWIN_64)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
|
option(
|
||||||
|
BUILD_GEOS
|
||||||
|
"If build geos on Windows"
|
||||||
|
OFF
|
||||||
|
)
|
||||||
|
|
||||||
option(
|
option(
|
||||||
BUILD_SHARED_LIBS
|
BUILD_SHARED_LIBS
|
||||||
""
|
""
|
||||||
|
|
|
@ -231,6 +231,7 @@ if(${BUILD_WITH_ROCKSDB})
|
||||||
if(${TD_LINUX})
|
if(${TD_LINUX})
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result")
|
||||||
endif(${TD_LINUX})
|
endif(${TD_LINUX})
|
||||||
|
MESSAGE(STATUS "CXXXX STATUS CONFIG: " ${CMAKE_CXX_FLAGS})
|
||||||
|
|
||||||
if(${TD_DARWIN})
|
if(${TD_DARWIN})
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
|
||||||
|
@ -252,7 +253,7 @@ if(${BUILD_WITH_ROCKSDB})
|
||||||
endif(${TD_DARWIN})
|
endif(${TD_DARWIN})
|
||||||
|
|
||||||
if(${TD_WINDOWS})
|
if(${TD_WINDOWS})
|
||||||
option(WITH_JNI "" ON)
|
option(WITH_JNI "" OFF)
|
||||||
endif(${TD_WINDOWS})
|
endif(${TD_WINDOWS})
|
||||||
|
|
||||||
if(${TD_WINDOWS})
|
if(${TD_WINDOWS})
|
||||||
|
@ -264,7 +265,7 @@ if(${BUILD_WITH_ROCKSDB})
|
||||||
option(WITH_FALLOCATE "" OFF)
|
option(WITH_FALLOCATE "" OFF)
|
||||||
option(WITH_JEMALLOC "" OFF)
|
option(WITH_JEMALLOC "" OFF)
|
||||||
option(WITH_GFLAGS "" OFF)
|
option(WITH_GFLAGS "" OFF)
|
||||||
option(PORTABLE "" ON)
|
option(PORTABLE "" OFF)
|
||||||
option(WITH_LIBURING "" OFF)
|
option(WITH_LIBURING "" OFF)
|
||||||
option(FAIL_ON_WARNINGS OFF)
|
option(FAIL_ON_WARNINGS OFF)
|
||||||
|
|
||||||
|
@ -272,8 +273,11 @@ if(${BUILD_WITH_ROCKSDB})
|
||||||
option(WITH_BENCHMARK_TOOLS "" OFF)
|
option(WITH_BENCHMARK_TOOLS "" OFF)
|
||||||
option(WITH_TOOLS "" OFF)
|
option(WITH_TOOLS "" OFF)
|
||||||
option(WITH_LIBURING "" OFF)
|
option(WITH_LIBURING "" OFF)
|
||||||
|
IF (TD_LINUX)
|
||||||
|
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON)
|
||||||
|
ELSE()
|
||||||
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
|
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
|
||||||
|
ENDIF()
|
||||||
add_subdirectory(rocksdb EXCLUDE_FROM_ALL)
|
add_subdirectory(rocksdb EXCLUDE_FROM_ALL)
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
rocksdb
|
rocksdb
|
||||||
|
|
|
@ -21,8 +21,8 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tvariant.h"
|
|
||||||
#include "tsimplehash.h"
|
#include "tsimplehash.h"
|
||||||
|
#include "tvariant.h"
|
||||||
|
|
||||||
struct SqlFunctionCtx;
|
struct SqlFunctionCtx;
|
||||||
struct SResultRowEntryInfo;
|
struct SResultRowEntryInfo;
|
||||||
|
@ -77,7 +77,7 @@ enum {
|
||||||
enum {
|
enum {
|
||||||
MAIN_SCAN = 0x0u,
|
MAIN_SCAN = 0x0u,
|
||||||
REVERSE_SCAN = 0x1u, // todo remove it
|
REVERSE_SCAN = 0x1u, // todo remove it
|
||||||
PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
|
PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SPoint1 {
|
typedef struct SPoint1 {
|
||||||
|
@ -130,43 +130,44 @@ typedef struct SSerializeDataHandle {
|
||||||
|
|
||||||
// incremental state storage
|
// incremental state storage
|
||||||
typedef struct STdbState {
|
typedef struct STdbState {
|
||||||
void* rocksdb;
|
void *rocksdb;
|
||||||
void** pHandle;
|
void **pHandle;
|
||||||
void* writeOpts;
|
void *writeOpts;
|
||||||
void* readOpts;
|
void *readOpts;
|
||||||
void** cfOpts;
|
void **cfOpts;
|
||||||
void* dbOpt;
|
void *dbOpt;
|
||||||
struct SStreamTask* pOwner;
|
struct SStreamTask *pOwner;
|
||||||
void* param;
|
void *param;
|
||||||
void* env;
|
void *env;
|
||||||
SListNode* pComparNode;
|
SListNode *pComparNode;
|
||||||
void* pBackendHandle;
|
void *pBackend;
|
||||||
char idstr[64];
|
char idstr[64];
|
||||||
void* compactFactory;
|
void *compactFactory;
|
||||||
|
TdThreadRwlock rwLock;
|
||||||
|
|
||||||
void* db;
|
void *db;
|
||||||
void* pStateDb;
|
void *pStateDb;
|
||||||
void* pFuncStateDb;
|
void *pFuncStateDb;
|
||||||
void* pFillStateDb; // todo refactor
|
void *pFillStateDb; // todo refactor
|
||||||
void* pSessionStateDb;
|
void *pSessionStateDb;
|
||||||
void* pParNameDb;
|
void *pParNameDb;
|
||||||
void* pParTagDb;
|
void *pParTagDb;
|
||||||
void* txn;
|
void *txn;
|
||||||
} STdbState;
|
} STdbState;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STdbState* pTdbState;
|
STdbState *pTdbState;
|
||||||
struct SStreamFileState* pFileState;
|
struct SStreamFileState *pFileState;
|
||||||
int32_t number;
|
int32_t number;
|
||||||
SSHashObj* parNameMap;
|
SSHashObj *parNameMap;
|
||||||
int64_t checkPointId;
|
int64_t checkPointId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
} SStreamState;
|
} SStreamState;
|
||||||
|
|
||||||
typedef struct SFunctionStateStore {
|
typedef struct SFunctionStateStore {
|
||||||
int32_t (*streamStateFuncPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
|
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
|
||||||
int32_t (*streamStateFuncGet)(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
|
int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen);
|
||||||
} SFunctionStateStore;
|
} SFunctionStateStore;
|
||||||
|
|
||||||
// sql function runtime context
|
// sql function runtime context
|
||||||
|
@ -180,7 +181,7 @@ typedef struct SqlFunctionCtx {
|
||||||
int16_t functionId; // function id
|
int16_t functionId; // function id
|
||||||
char *pOutput; // final result output buffer, point to sdata->data
|
char *pOutput; // final result output buffer, point to sdata->data
|
||||||
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param
|
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param
|
||||||
SFunctParam *param;
|
SFunctParam *param;
|
||||||
// corresponding output buffer for timestamp of each result, e.g., diff/csum
|
// corresponding output buffer for timestamp of each result, e.g., diff/csum
|
||||||
SColumnInfoData *pTsOutput;
|
SColumnInfoData *pTsOutput;
|
||||||
int32_t numOfParams;
|
int32_t numOfParams;
|
||||||
|
|
|
@ -34,7 +34,24 @@ extern "C" {
|
||||||
// SListNode* streamBackendAddCompare(void* backend, void* arg);
|
// SListNode* streamBackendAddCompare(void* backend, void* arg);
|
||||||
// void streamBackendDelCompare(void* backend, void* arg);
|
// void streamBackendDelCompare(void* backend, void* arg);
|
||||||
|
|
||||||
//typedef struct STdbState {
|
// <<<<<<< HEAD
|
||||||
|
// typedef struct STdbState {
|
||||||
|
// rocksdb_t* rocksdb;
|
||||||
|
// rocksdb_column_family_handle_t** pHandle;
|
||||||
|
// rocksdb_writeoptions_t* writeOpts;
|
||||||
|
// rocksdb_readoptions_t* readOpts;
|
||||||
|
// rocksdb_options_t** cfOpts;
|
||||||
|
// rocksdb_options_t* dbOpt;
|
||||||
|
// struct SStreamTask* pOwner;
|
||||||
|
// void* param;
|
||||||
|
// void* env;
|
||||||
|
// SListNode* pComparNode;
|
||||||
|
// void* pBackend;
|
||||||
|
// char idstr[64];
|
||||||
|
// void* compactFactory;
|
||||||
|
// TdThreadRwlock rwLock;
|
||||||
|
// =======
|
||||||
|
// typedef struct STdbState {
|
||||||
// rocksdb_t* rocksdb;
|
// rocksdb_t* rocksdb;
|
||||||
// rocksdb_column_family_handle_t** pHandle;
|
// rocksdb_column_family_handle_t** pHandle;
|
||||||
// rocksdb_writeoptions_t* writeOpts;
|
// rocksdb_writeoptions_t* writeOpts;
|
||||||
|
@ -58,6 +75,7 @@ extern "C" {
|
||||||
// TTB* pParTagDb;
|
// TTB* pParTagDb;
|
||||||
// TXN* txn;
|
// TXN* txn;
|
||||||
//} STdbState;
|
//} STdbState;
|
||||||
|
//>>>>>>> enh/dev3.0
|
||||||
|
|
||||||
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
|
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
|
||||||
void streamStateClose(SStreamState* pState, bool remove);
|
void streamStateClose(SStreamState* pState, bool remove);
|
||||||
|
|
|
@ -87,6 +87,28 @@ target_include_directories(
|
||||||
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
|
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
|
||||||
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
|
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
|
||||||
)
|
)
|
||||||
|
IF (TD_LINUX)
|
||||||
|
target_link_libraries(
|
||||||
|
vnode
|
||||||
|
PUBLIC os
|
||||||
|
PUBLIC util
|
||||||
|
PUBLIC common
|
||||||
|
PUBLIC tfs
|
||||||
|
PUBLIC wal
|
||||||
|
PUBLIC qworker
|
||||||
|
PUBLIC sync
|
||||||
|
PUBLIC executor
|
||||||
|
PUBLIC scheduler
|
||||||
|
PUBLIC tdb
|
||||||
|
|
||||||
|
# PUBLIC bdb
|
||||||
|
# PUBLIC scalar
|
||||||
|
PUBLIC rocksdb-shared
|
||||||
|
PUBLIC transport
|
||||||
|
PUBLIC stream
|
||||||
|
PUBLIC index
|
||||||
|
)
|
||||||
|
ELSE()
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
vnode
|
vnode
|
||||||
PUBLIC os
|
PUBLIC os
|
||||||
|
@ -107,6 +129,7 @@ target_link_libraries(
|
||||||
PUBLIC stream
|
PUBLIC stream
|
||||||
PUBLIC index
|
PUBLIC index
|
||||||
)
|
)
|
||||||
|
ENDIF()
|
||||||
|
|
||||||
IF (TD_GRANT)
|
IF (TD_GRANT)
|
||||||
TARGET_LINK_LIBRARIES(vnode PUBLIC grant)
|
TARGET_LINK_LIBRARIES(vnode PUBLIC grant)
|
||||||
|
|
|
@ -6,13 +6,23 @@ target_include_directories(
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
if(${BUILD_WITH_ROCKSDB})
|
if(${BUILD_WITH_ROCKSDB})
|
||||||
|
IF (TD_LINUX)
|
||||||
|
target_link_libraries(
|
||||||
|
stream
|
||||||
|
PUBLIC rocksdb-shared tdb
|
||||||
|
PRIVATE os util transport qcom executor wal index
|
||||||
|
)
|
||||||
|
ELSE()
|
||||||
target_link_libraries(
|
target_link_libraries(
|
||||||
stream
|
stream
|
||||||
PUBLIC rocksdb tdb
|
PUBLIC rocksdb tdb
|
||||||
PRIVATE os util transport qcom executor wal index
|
PRIVATE os util transport qcom executor wal index
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ENDIF()
|
||||||
|
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
stream
|
stream
|
||||||
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
|
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
|
||||||
|
|
|
@ -22,6 +22,9 @@ typedef struct SCompactFilteFactory {
|
||||||
void* status;
|
void* status;
|
||||||
} SCompactFilteFactory;
|
} SCompactFilteFactory;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
void* tableOpt;
|
||||||
|
} RocksdbCfParam;
|
||||||
typedef struct {
|
typedef struct {
|
||||||
rocksdb_t* db;
|
rocksdb_t* db;
|
||||||
rocksdb_column_family_handle_t** pHandle;
|
rocksdb_column_family_handle_t** pHandle;
|
||||||
|
@ -29,12 +32,13 @@ typedef struct {
|
||||||
rocksdb_readoptions_t* rOpt;
|
rocksdb_readoptions_t* rOpt;
|
||||||
rocksdb_options_t** cfOpt;
|
rocksdb_options_t** cfOpt;
|
||||||
rocksdb_options_t* dbOpt;
|
rocksdb_options_t* dbOpt;
|
||||||
void* param;
|
RocksdbCfParam* param;
|
||||||
void* pBackendHandle;
|
void* pBackend;
|
||||||
SListNode* pCompareNode;
|
SListNode* pCompareNode;
|
||||||
|
rocksdb_comparator_t** pCompares;
|
||||||
} RocksdbCfInst;
|
} RocksdbCfInst;
|
||||||
|
|
||||||
int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids);
|
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
|
||||||
|
|
||||||
void destroyRocksdbCfInst(RocksdbCfInst* inst);
|
void destroyRocksdbCfInst(RocksdbCfInst* inst);
|
||||||
|
|
||||||
|
@ -46,9 +50,6 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c
|
||||||
char** newval, size_t* newvlen, unsigned char* value_changed);
|
char** newval, size_t* newvlen, unsigned char* value_changed);
|
||||||
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
void* tableOpt;
|
|
||||||
} RocksdbCfParam;
|
|
||||||
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
|
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
|
||||||
|
|
||||||
typedef int (*EncodeFunc)(void* key, char* buf);
|
typedef int (*EncodeFunc)(void* key, char* buf);
|
||||||
|
@ -80,16 +81,16 @@ void* streamBackendInit(const char* path) {
|
||||||
rocksdb_env_set_low_priority_background_threads(env, 4);
|
rocksdb_env_set_low_priority_background_threads(env, 4);
|
||||||
rocksdb_env_set_high_priority_background_threads(env, 2);
|
rocksdb_env_set_high_priority_background_threads(env, 2);
|
||||||
|
|
||||||
rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20);
|
rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20);
|
||||||
|
|
||||||
rocksdb_options_t* opts = rocksdb_options_create();
|
rocksdb_options_t* opts = rocksdb_options_create();
|
||||||
rocksdb_options_set_env(opts, env);
|
rocksdb_options_set_env(opts, env);
|
||||||
rocksdb_options_set_create_if_missing(opts, 1);
|
rocksdb_options_set_create_if_missing(opts, 1);
|
||||||
rocksdb_options_set_create_missing_column_families(opts, 1);
|
rocksdb_options_set_create_missing_column_families(opts, 1);
|
||||||
rocksdb_options_set_write_buffer_size(opts, 128 << 20);
|
rocksdb_options_set_write_buffer_size(opts, 48 << 20);
|
||||||
rocksdb_options_set_max_total_wal_size(opts, 128 << 20);
|
rocksdb_options_set_max_total_wal_size(opts, 128 << 20);
|
||||||
rocksdb_options_set_recycle_log_file_num(opts, 6);
|
rocksdb_options_set_recycle_log_file_num(opts, 6);
|
||||||
rocksdb_options_set_max_write_buffer_number(opts, 3);
|
rocksdb_options_set_max_write_buffer_number(opts, 2);
|
||||||
rocksdb_options_set_info_log_level(opts, 0);
|
rocksdb_options_set_info_log_level(opts, 0);
|
||||||
|
|
||||||
pHandle->env = env;
|
pHandle->env = env;
|
||||||
|
@ -114,25 +115,7 @@ void* streamBackendInit(const char* path) {
|
||||||
/*
|
/*
|
||||||
list all cf and get prefix
|
list all cf and get prefix
|
||||||
*/
|
*/
|
||||||
int64_t streamId;
|
streamStateOpenBackendCf(pHandle, (char*)path, cfs, nCf);
|
||||||
int32_t taskId, dummpy = 0;
|
|
||||||
SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
||||||
for (size_t i = 0; i < nCf; i++) {
|
|
||||||
char* cf = cfs[i];
|
|
||||||
char suffix[64] = {0};
|
|
||||||
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, suffix)) {
|
|
||||||
char idstr[128] = {0};
|
|
||||||
sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId);
|
|
||||||
// qError("make cf name %s", idstr);
|
|
||||||
if (taosHashGet(tbl, idstr, strlen(idstr) + 1) == NULL) {
|
|
||||||
taosHashPut(tbl, idstr, strlen(idstr) + 1, &dummpy, sizeof(dummpy));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
streamStateOpenBackendCf(pHandle, (char*)path, tbl);
|
|
||||||
taosHashCleanup(tbl);
|
|
||||||
}
|
}
|
||||||
rocksdb_list_column_families_destroy(cfs, nCf);
|
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||||
|
|
||||||
|
@ -159,16 +142,17 @@ void streamBackendCleanup(void* arg) {
|
||||||
}
|
}
|
||||||
taosHashCleanup(pHandle->cfInst);
|
taosHashCleanup(pHandle->cfInst);
|
||||||
|
|
||||||
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
|
if (pHandle->db) {
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
rocksdb_flush(pHandle->db, flushOpt, &err);
|
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
|
||||||
if (err != NULL) {
|
rocksdb_flush(pHandle->db, flushOpt, &err);
|
||||||
qError("failed to flush db before streamBackend clean up, reason:%s", err);
|
if (err != NULL) {
|
||||||
taosMemoryFree(err);
|
qError("failed to flush db before streamBackend clean up, reason:%s", err);
|
||||||
|
taosMemoryFree(err);
|
||||||
|
}
|
||||||
|
rocksdb_flushoptions_destroy(flushOpt);
|
||||||
|
rocksdb_close(pHandle->db);
|
||||||
}
|
}
|
||||||
rocksdb_flushoptions_destroy(flushOpt);
|
|
||||||
|
|
||||||
rocksdb_close(pHandle->db);
|
|
||||||
rocksdb_options_destroy(pHandle->dbOpt);
|
rocksdb_options_destroy(pHandle->dbOpt);
|
||||||
rocksdb_env_destroy(pHandle->env);
|
rocksdb_env_destroy(pHandle->env);
|
||||||
rocksdb_cache_destroy(pHandle->cache);
|
rocksdb_cache_destroy(pHandle->cache);
|
||||||
|
@ -209,12 +193,13 @@ void streamBackendDelCompare(void* backend, void* arg) {
|
||||||
}
|
}
|
||||||
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
|
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
|
||||||
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
|
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
|
||||||
int streamGetInit(const char* funcName);
|
int streamGetInit(SStreamState* pState, const char* funcName);
|
||||||
|
|
||||||
// |key|-----value------|
|
// |key|-----value------|
|
||||||
// |key|ttl|len|userData|
|
// |key|ttl|len|userData|
|
||||||
|
|
||||||
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, void** snapshot, void** readOpt);
|
static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
|
||||||
|
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
|
||||||
|
|
||||||
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
|
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
|
||||||
int ret = memcmp(aBuf, bBuf, aLen);
|
int ret = memcmp(aBuf, bBuf, aLen);
|
||||||
|
@ -666,7 +651,7 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c
|
||||||
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
||||||
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||||
for (int i = 0; i < cfLen; i++) {
|
for (int i = 0; i < cfLen; i++) {
|
||||||
rocksdb_column_family_handle_destroy(inst->pHandle[i]);
|
if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
rocksdb_writeoptions_destroy(inst->wOpt);
|
rocksdb_writeoptions_destroy(inst->wOpt);
|
||||||
|
@ -674,118 +659,130 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
|
||||||
|
|
||||||
rocksdb_readoptions_destroy(inst->rOpt);
|
rocksdb_readoptions_destroy(inst->rOpt);
|
||||||
taosMemoryFree(inst->cfOpt);
|
taosMemoryFree(inst->cfOpt);
|
||||||
taosMemoryFree(inst->param);
|
|
||||||
taosMemoryFreeClear(inst->param);
|
taosMemoryFreeClear(inst->param);
|
||||||
taosMemoryFree(inst);
|
taosMemoryFree(inst);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
|
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) {
|
||||||
SBackendHandle* handle = backend;
|
SBackendHandle* handle = backend;
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
size_t nSize = taosHashGetSize(ids);
|
int64_t streamId;
|
||||||
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
int32_t taskId, dummy = 0;
|
||||||
|
char suffix[64] = {0};
|
||||||
|
|
||||||
char** cfNames = taosMemoryCalloc(nSize * cfLen + 1, sizeof(char*));
|
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
||||||
void* pIter = taosHashIterate(ids, NULL);
|
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*));
|
||||||
size_t keyLen = 0;
|
rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**));
|
||||||
char* idstr = taosHashGetKey(pIter, &keyLen);
|
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
|
||||||
for (int i = 0; i < nSize * cfLen + 1; i++) {
|
|
||||||
cfNames[i] = (char*)taosMemoryCalloc(1, 128);
|
|
||||||
if (i == 0) {
|
|
||||||
memcpy(cfNames[0], "default", strlen("default"));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key);
|
for (int i = 0; i < nCf; i++) {
|
||||||
if (i % cfLen == 0) {
|
char* cf = cfs[i];
|
||||||
pIter = taosHashIterate(ids, pIter);
|
char funcname[64] = {0};
|
||||||
if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*));
|
|
||||||
RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*));
|
|
||||||
for (int i = 0; i < nSize * cfLen + 1; i++) {
|
|
||||||
cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt);
|
cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt);
|
||||||
if (i == 0) {
|
if (i == 0) continue;
|
||||||
continue;
|
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
|
||||||
|
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
|
||||||
|
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
|
||||||
|
|
||||||
|
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
|
||||||
|
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
|
||||||
|
|
||||||
|
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt);
|
||||||
|
params[i].tableOpt = tableOpt;
|
||||||
|
|
||||||
|
int idx = streamGetInit(NULL, funcname);
|
||||||
|
SCfInit* cfPara = &ginitDict[idx];
|
||||||
|
|
||||||
|
rocksdb_comparator_t* compare =
|
||||||
|
rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
|
||||||
|
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare);
|
||||||
|
pCompare[i] = compare;
|
||||||
}
|
}
|
||||||
// refactor later
|
|
||||||
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
|
|
||||||
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
|
|
||||||
|
|
||||||
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
|
|
||||||
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
|
|
||||||
|
|
||||||
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt);
|
|
||||||
params[i].tableOpt = tableOpt;
|
|
||||||
};
|
|
||||||
|
|
||||||
rocksdb_comparator_t** pCompare = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_comparator_t**));
|
|
||||||
for (int i = 0; i < nSize * cfLen + 1; i++) {
|
|
||||||
if (i == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
SCfInit* cf = &ginitDict[(i - 1) % cfLen];
|
|
||||||
|
|
||||||
rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName);
|
|
||||||
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare);
|
|
||||||
pCompare[i] = compare;
|
|
||||||
}
|
}
|
||||||
rocksdb_column_family_handle_t** cfHandle =
|
rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nCf, (const char* const*)cfs,
|
||||||
taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_column_family_handle_t*));
|
|
||||||
rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nSize * cfLen + 1, (const char* const*)cfNames,
|
|
||||||
(const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
|
(const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
qError("failed to open rocksdb cf, reason:%s", err);
|
qError("failed to open rocksdb cf, reason:%s", err);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
} else {
|
} else {
|
||||||
qDebug("succ to open rocksdb cf, reason:%s", err);
|
qDebug("succ to open rocksdb cf");
|
||||||
}
|
|
||||||
|
|
||||||
pIter = taosHashIterate(ids, NULL);
|
|
||||||
idstr = taosHashGetKey(pIter, &keyLen);
|
|
||||||
for (int i = 0; i < nSize; i++) {
|
|
||||||
RocksdbCfInst* inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
|
|
||||||
rocksdb_column_family_handle_t** subCf = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
|
|
||||||
rocksdb_comparator_t** subCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
|
|
||||||
RocksdbCfParam* subParam = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
|
|
||||||
rocksdb_options_t** subOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
|
|
||||||
for (int j = 0; j < cfLen; j++) {
|
|
||||||
subCf[j] = cfHandle[i * cfLen + j + 1];
|
|
||||||
subCompare[j] = pCompare[i * cfLen + j + 1];
|
|
||||||
subParam[j] = params[i * cfLen + j + 1];
|
|
||||||
subOpt[j] = cfOpts[i * cfLen + j + 1];
|
|
||||||
}
|
|
||||||
inst->db = db;
|
|
||||||
inst->pHandle = subCf;
|
|
||||||
inst->wOpt = rocksdb_writeoptions_create();
|
|
||||||
inst->rOpt = rocksdb_readoptions_create();
|
|
||||||
inst->cfOpt = (rocksdb_options_t**)subOpt;
|
|
||||||
inst->dbOpt = handle->dbOpt;
|
|
||||||
inst->param = subParam;
|
|
||||||
inst->pBackendHandle = handle;
|
|
||||||
handle->db = db;
|
|
||||||
SCfComparator compare = {.comp = subCompare, .numOfComp = cfLen};
|
|
||||||
inst->pCompareNode = streamBackendAddCompare(handle, &compare);
|
|
||||||
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
|
|
||||||
|
|
||||||
taosHashPut(handle->cfInst, idstr, keyLen, &inst, sizeof(void*));
|
|
||||||
|
|
||||||
pIter = taosHashIterate(ids, pIter);
|
|
||||||
if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen);
|
|
||||||
}
|
}
|
||||||
|
// close default cf
|
||||||
rocksdb_column_family_handle_destroy(cfHandle[0]);
|
rocksdb_column_family_handle_destroy(cfHandle[0]);
|
||||||
rocksdb_options_destroy(cfOpts[0]);
|
rocksdb_options_destroy(cfOpts[0]);
|
||||||
|
handle->db = db;
|
||||||
|
|
||||||
for (int i = 0; i < nSize * cfLen + 1; i++) {
|
static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||||
taosMemoryFree(cfNames[i]);
|
for (int i = 0; i < nCf; i++) {
|
||||||
|
char* cf = cfs[i];
|
||||||
|
if (i == 0) continue;
|
||||||
|
char funcname[64] = {0};
|
||||||
|
if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
|
||||||
|
char idstr[128] = {0};
|
||||||
|
sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId);
|
||||||
|
|
||||||
|
int idx = streamGetInit(NULL, funcname);
|
||||||
|
|
||||||
|
RocksdbCfInst* inst = NULL;
|
||||||
|
RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1);
|
||||||
|
if (pInst == NULL || *pInst == NULL) {
|
||||||
|
inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
|
||||||
|
inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
|
||||||
|
inst->cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
|
||||||
|
inst->wOpt = rocksdb_writeoptions_create();
|
||||||
|
inst->rOpt = rocksdb_readoptions_create();
|
||||||
|
inst->param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
|
||||||
|
inst->pBackend = handle;
|
||||||
|
inst->db = db;
|
||||||
|
inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
|
||||||
|
|
||||||
|
inst->dbOpt = handle->dbOpt;
|
||||||
|
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
|
||||||
|
taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
|
||||||
|
} else {
|
||||||
|
inst = *pInst;
|
||||||
|
}
|
||||||
|
inst->cfOpt[idx] = cfOpts[i];
|
||||||
|
inst->pCompares[idx] = pCompare[i];
|
||||||
|
memcpy(&(inst->param[idx]), &(params[i]), sizeof(RocksdbCfParam));
|
||||||
|
inst->pHandle[idx] = cfHandle[i];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFree(cfNames);
|
void** pIter = taosHashIterate(handle->cfInst, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
RocksdbCfInst* inst = *pIter;
|
||||||
|
|
||||||
|
for (int i = 0; i < cfLen; i++) {
|
||||||
|
if (inst->cfOpt[i] == NULL) {
|
||||||
|
rocksdb_options_t* opt = rocksdb_options_create_copy(handle->dbOpt);
|
||||||
|
rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
|
||||||
|
rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
|
||||||
|
|
||||||
|
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
|
||||||
|
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
|
||||||
|
|
||||||
|
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
|
||||||
|
|
||||||
|
SCfInit* cfPara = &ginitDict[i];
|
||||||
|
|
||||||
|
rocksdb_comparator_t* compare =
|
||||||
|
rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
|
||||||
|
rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
|
||||||
|
|
||||||
|
inst->pCompares[i] = compare;
|
||||||
|
inst->cfOpt[i] = opt;
|
||||||
|
inst->param[i].tableOpt = tableOpt;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen};
|
||||||
|
inst->pCompareNode = streamBackendAddCompare(handle, &compare);
|
||||||
|
pIter = taosHashIterate(handle->cfInst, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFree(cfHandle);
|
taosMemoryFree(cfHandle);
|
||||||
taosMemoryFree(pCompare);
|
taosMemoryFree(pCompare);
|
||||||
taosMemoryFree(params);
|
taosMemoryFree(params);
|
||||||
taosMemoryFree(cfOpts);
|
taosMemoryFree(cfOpts);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
|
@ -801,15 +798,14 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
pState->pTdbState->pHandle = (void**)inst->pHandle;
|
pState->pTdbState->pHandle = (void**)inst->pHandle;
|
||||||
pState->pTdbState->writeOpts = inst->wOpt;
|
pState->pTdbState->writeOpts = inst->wOpt;
|
||||||
pState->pTdbState->readOpts = inst->rOpt;
|
pState->pTdbState->readOpts = inst->rOpt;
|
||||||
pState->pTdbState->cfOpts = (void**)inst->cfOpt;
|
pState->pTdbState->cfOpts = (void**)(inst->cfOpt);
|
||||||
pState->pTdbState->dbOpt = handle->dbOpt;
|
pState->pTdbState->dbOpt = handle->dbOpt;
|
||||||
pState->pTdbState->param = inst->param;
|
pState->pTdbState->param = inst->param;
|
||||||
pState->pTdbState->pBackendHandle = handle;
|
pState->pTdbState->pBackend = handle;
|
||||||
pState->pTdbState->pComparNode = inst->pCompareNode;
|
pState->pTdbState->pComparNode = inst->pCompareNode;
|
||||||
taosThreadMutexUnlock(&handle->cfMutex);
|
taosThreadMutexUnlock(&handle->cfMutex);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&handle->cfMutex);
|
taosThreadMutexUnlock(&handle->cfMutex);
|
||||||
|
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
|
@ -839,25 +835,17 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
|
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
|
||||||
pCompare[i] = compare;
|
pCompare[i] = compare;
|
||||||
}
|
}
|
||||||
rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*));
|
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
|
||||||
for (int i = 0; i < cfLen; i++) {
|
|
||||||
char buf[128] = {0};
|
|
||||||
GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key);
|
|
||||||
cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err);
|
|
||||||
if (err != NULL) {
|
|
||||||
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
|
|
||||||
taosMemoryFreeClear(err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pState->pTdbState->rocksdb = handle->db;
|
pState->pTdbState->rocksdb = handle->db;
|
||||||
pState->pTdbState->pHandle = (void**)cfHandle;
|
pState->pTdbState->pHandle = (void**)cfHandle;
|
||||||
pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
|
pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
|
||||||
pState->pTdbState->readOpts = rocksdb_readoptions_create();
|
pState->pTdbState->readOpts = rocksdb_readoptions_create();
|
||||||
pState->pTdbState->cfOpts = (void**)(rocksdb_options_t**)cfOpt;
|
pState->pTdbState->cfOpts = (void**)cfOpt;
|
||||||
pState->pTdbState->dbOpt = handle->dbOpt;
|
pState->pTdbState->dbOpt = handle->dbOpt;
|
||||||
pState->pTdbState->param = param;
|
pState->pTdbState->param = param;
|
||||||
pState->pTdbState->pBackendHandle = handle;
|
pState->pTdbState->pBackend = handle;
|
||||||
|
|
||||||
|
taosThreadRwlockInit(&pState->pTdbState->rwLock, NULL);
|
||||||
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
|
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
|
||||||
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
|
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
|
||||||
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
|
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
|
||||||
|
@ -866,7 +854,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
SBackendHandle* pHandle = pState->pTdbState->pBackendHandle;
|
SBackendHandle* pHandle = pState->pTdbState->pBackend;
|
||||||
taosThreadMutexLock(&pHandle->cfMutex);
|
taosThreadMutexLock(&pHandle->cfMutex);
|
||||||
RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||||
if (ppInst != NULL && *ppInst != NULL) {
|
if (ppInst != NULL && *ppInst != NULL) {
|
||||||
|
@ -888,7 +876,9 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
if (remove) {
|
if (remove) {
|
||||||
for (int i = 0; i < cfLen; i++) {
|
for (int i = 0; i < cfLen; i++) {
|
||||||
rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err);
|
if (pState->pTdbState->pHandle[i] != NULL)
|
||||||
|
rocksdb_drop_column_family(pState->pTdbState->rocksdb,
|
||||||
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[i], &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
|
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
|
@ -897,7 +887,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
} else {
|
} else {
|
||||||
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
|
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
|
||||||
for (int i = 0; i < cfLen; i++) {
|
for (int i = 0; i < cfLen; i++) {
|
||||||
rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err);
|
if (pState->pTdbState->pHandle[i] != NULL)
|
||||||
|
rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
|
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
|
@ -907,7 +898,9 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < cfLen; i++) {
|
for (int i = 0; i < cfLen; i++) {
|
||||||
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
|
if (pState->pTdbState->pHandle[i] != NULL) {
|
||||||
|
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFreeClear(pState->pTdbState->pHandle);
|
taosMemoryFreeClear(pState->pTdbState->pHandle);
|
||||||
for (int i = 0; i < cfLen; i++) {
|
for (int i = 0; i < cfLen; i++) {
|
||||||
|
@ -916,7 +909,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (remove) {
|
if (remove) {
|
||||||
streamBackendDelCompare(pState->pTdbState->pBackendHandle, pState->pTdbState->pComparNode);
|
streamBackendDelCompare(pState->pTdbState->pBackend, pState->pTdbState->pComparNode);
|
||||||
}
|
}
|
||||||
rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts);
|
rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts);
|
||||||
pState->pTdbState->writeOpts = NULL;
|
pState->pTdbState->writeOpts = NULL;
|
||||||
|
@ -925,24 +918,52 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||||
pState->pTdbState->readOpts = NULL;
|
pState->pTdbState->readOpts = NULL;
|
||||||
taosMemoryFreeClear(pState->pTdbState->cfOpts);
|
taosMemoryFreeClear(pState->pTdbState->cfOpts);
|
||||||
taosMemoryFreeClear(pState->pTdbState->param);
|
taosMemoryFreeClear(pState->pTdbState->param);
|
||||||
|
|
||||||
|
taosThreadRwlockDestroy(&pState->pTdbState->rwLock);
|
||||||
pState->pTdbState->rocksdb = NULL;
|
pState->pTdbState->rocksdb = NULL;
|
||||||
}
|
}
|
||||||
void streamStateDestroyCompar(void* arg) {
|
void streamStateDestroyCompar(void* arg) {
|
||||||
SCfComparator* comp = (SCfComparator*)arg;
|
SCfComparator* comp = (SCfComparator*)arg;
|
||||||
for (int i = 0; i < comp->numOfComp; i++) {
|
for (int i = 0; i < comp->numOfComp; i++) {
|
||||||
rocksdb_comparator_destroy(comp->comp[i]);
|
if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]);
|
||||||
}
|
}
|
||||||
taosMemoryFree(comp->comp);
|
taosMemoryFree(comp->comp);
|
||||||
}
|
}
|
||||||
|
|
||||||
int streamGetInit(const char* funcName) {
|
int streamGetInit(SStreamState* pState, const char* funcName) {
|
||||||
|
int idx = -1;
|
||||||
size_t len = strlen(funcName);
|
size_t len = strlen(funcName);
|
||||||
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
|
||||||
if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
|
if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
|
||||||
return i;
|
idx = i;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1;
|
if (pState != NULL && idx != -1) {
|
||||||
|
rocksdb_column_family_handle_t* cf = NULL;
|
||||||
|
taosThreadRwlockRdlock(&pState->pTdbState->rwLock);
|
||||||
|
cf = pState->pTdbState->pHandle[idx];
|
||||||
|
taosThreadRwlockUnlock(&pState->pTdbState->rwLock);
|
||||||
|
if (cf == NULL) {
|
||||||
|
char buf[128] = {0};
|
||||||
|
GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key);
|
||||||
|
char* err = NULL;
|
||||||
|
|
||||||
|
taosThreadRwlockWrlock(&pState->pTdbState->rwLock);
|
||||||
|
cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err);
|
||||||
|
if (err != NULL) {
|
||||||
|
idx = -1;
|
||||||
|
qError("failed to to open cf, %p 0x%" PRIx64 "-%d_%s, reason:%s", pState, pState->streamId, pState->taskId,
|
||||||
|
funcName, err);
|
||||||
|
taosMemoryFree(err);
|
||||||
|
} else {
|
||||||
|
pState->pTdbState->pHandle[idx] = cf;
|
||||||
|
}
|
||||||
|
taosThreadRwlockUnlock(&pState->pTdbState->rwLock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return idx;
|
||||||
}
|
}
|
||||||
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
|
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
|
||||||
rocksdb_iter_seek(iter, buf, len);
|
rocksdb_iter_seek(iter, buf, len);
|
||||||
|
@ -954,8 +975,9 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, void** snapshot, void** readOpt) {
|
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot,
|
||||||
int idx = streamGetInit(cfName);
|
rocksdb_readoptions_t** readOpt) {
|
||||||
|
int idx = streamGetInit(pState, cfName);
|
||||||
|
|
||||||
if (snapshot != NULL) {
|
if (snapshot != NULL) {
|
||||||
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
|
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
|
||||||
|
@ -966,7 +988,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
|
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
|
||||||
rocksdb_readoptions_set_fill_cache(rOpt, 0);
|
rocksdb_readoptions_set_fill_cache(rOpt, 0);
|
||||||
|
|
||||||
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
|
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt,
|
||||||
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
|
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
|
||||||
|
@ -974,7 +997,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
code = 0; \
|
code = 0; \
|
||||||
char buf[128] = {0}; \
|
char buf[128] = {0}; \
|
||||||
char* err = NULL; \
|
char* err = NULL; \
|
||||||
int i = streamGetInit(funcname); \
|
int i = streamGetInit(pState, funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
qWarn("streamState failed to get cf name: %s", funcname); \
|
qWarn("streamState failed to get cf name: %s", funcname); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
|
@ -983,11 +1006,12 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = \
|
||||||
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
|
||||||
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
||||||
char* ttlV = NULL; \
|
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
||||||
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
|
char* ttlV = NULL; \
|
||||||
|
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
|
||||||
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
|
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
|
||||||
if (err != NULL) { \
|
if (err != NULL) { \
|
||||||
taosMemoryFree(err); \
|
taosMemoryFree(err); \
|
||||||
|
@ -1004,7 +1028,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
code = 0; \
|
code = 0; \
|
||||||
char buf[128] = {0}; \
|
char buf[128] = {0}; \
|
||||||
char* err = NULL; \
|
char* err = NULL; \
|
||||||
int i = streamGetInit(funcname); \
|
int i = streamGetInit(pState, funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
qWarn("streamState failed to get cf name: %s", funcname); \
|
qWarn("streamState failed to get cf name: %s", funcname); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
|
@ -1013,11 +1037,12 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = \
|
||||||
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
|
||||||
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
|
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
||||||
size_t len = 0; \
|
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
|
||||||
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
|
size_t len = 0; \
|
||||||
|
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
|
||||||
if (val == NULL) { \
|
if (val == NULL) { \
|
||||||
if (err == NULL) { \
|
if (err == NULL) { \
|
||||||
qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
|
qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
|
||||||
|
@ -1051,7 +1076,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
code = 0; \
|
code = 0; \
|
||||||
char buf[128] = {0}; \
|
char buf[128] = {0}; \
|
||||||
char* err = NULL; \
|
char* err = NULL; \
|
||||||
int i = streamGetInit(funcname); \
|
int i = streamGetInit(pState, funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
|
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
|
@ -1060,9 +1085,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = \
|
||||||
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
|
||||||
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
rocksdb_t* db = pState->pTdbState->rocksdb; \
|
||||||
|
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
|
||||||
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
|
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
|
||||||
if (err != NULL) { \
|
if (err != NULL) { \
|
||||||
qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
|
qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
|
||||||
|
@ -1113,8 +1139,10 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1],
|
if (pState->pTdbState->pHandle[1] != NULL) {
|
||||||
sKeyStr, sLen, eKeyStr, eLen, &err);
|
rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1],
|
||||||
|
sKeyStr, sLen, eKeyStr, eLen, &err);
|
||||||
|
}
|
||||||
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
|
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
|
||||||
// eLen);
|
// eLen);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
|
@ -1214,7 +1242,8 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
|
||||||
}
|
}
|
||||||
pCur->number = pState->number;
|
pCur->number = pState->number;
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
|
||||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
|
@ -1254,7 +1283,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
|
||||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
if (pCur == NULL) return NULL;
|
if (pCur == NULL) return NULL;
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
|
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
|
||||||
|
|
||||||
rocksdb_iter_prev(pCur->iter);
|
rocksdb_iter_prev(pCur->iter);
|
||||||
|
@ -1276,7 +1306,8 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
|
||||||
|
|
||||||
if (pCur == NULL) return NULL;
|
if (pCur == NULL) return NULL;
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
|
||||||
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
SStateKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
|
@ -1368,7 +1399,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
|
||||||
}
|
}
|
||||||
pCur->number = pState->number;
|
pCur->number = pState->number;
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
@ -1407,7 +1439,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
pCur->number = pState->number;
|
pCur->number = pState->number;
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
|
@ -1443,7 +1476,8 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
pCur->number = pState->number;
|
pCur->number = pState->number;
|
||||||
|
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
|
@ -1535,7 +1569,8 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
|
||||||
if (pCur == NULL) return NULL;
|
if (pCur == NULL) return NULL;
|
||||||
|
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int len = winKeyEncode((void*)key, buf);
|
int len = winKeyEncode((void*)key, buf);
|
||||||
|
@ -1594,7 +1629,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
|
||||||
}
|
}
|
||||||
|
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int len = winKeyEncode((void*)key, buf);
|
int len = winKeyEncode((void*)key, buf);
|
||||||
|
@ -1629,7 +1665,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
|
||||||
}
|
}
|
||||||
|
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int len = winKeyEncode((void*)key, buf);
|
int len = winKeyEncode((void*)key, buf);
|
||||||
|
@ -1664,7 +1701,8 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
|
||||||
}
|
}
|
||||||
pCur->number = pState->number;
|
pCur->number = pState->number;
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
int32_t c = 0;
|
int32_t c = 0;
|
||||||
|
@ -1895,7 +1933,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
|
||||||
|
|
||||||
rocksdb_snapshot_t* snapshot = NULL;
|
rocksdb_snapshot_t* snapshot = NULL;
|
||||||
rocksdb_readoptions_t* readopts = NULL;
|
rocksdb_readoptions_t* readopts = NULL;
|
||||||
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", (void**)&snapshot, (void**)&readopts);
|
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1934,7 +1972,8 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
|
||||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
|
|
||||||
pCur->db = pState->pTdbState->rocksdb;
|
pCur->db = pState->pTdbState->rocksdb;
|
||||||
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
|
pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
return pCur;
|
return pCur;
|
||||||
}
|
}
|
||||||
int32_t streamDefaultIterValid_rocksdb(void* iter) {
|
int32_t streamDefaultIterValid_rocksdb(void* iter) {
|
||||||
|
@ -1965,7 +2004,6 @@ char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
|
||||||
}
|
}
|
||||||
return dst;
|
return dst;
|
||||||
}
|
}
|
||||||
|
|
||||||
// batch func
|
// batch func
|
||||||
void* streamStateCreateBatch() {
|
void* streamStateCreateBatch() {
|
||||||
rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
|
rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
|
||||||
|
@ -1980,7 +2018,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
|
||||||
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
|
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
|
||||||
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
|
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
|
||||||
void* val, int32_t vlen, int64_t ttl) {
|
void* val, int32_t vlen, int64_t ttl) {
|
||||||
int i = streamGetInit(cfName);
|
int i = streamGetInit(pState, cfName);
|
||||||
|
|
||||||
if (i < 0) {
|
if (i < 0) {
|
||||||
qError("streamState failed to put to cf name:%s", cfName);
|
qError("streamState failed to put to cf name:%s", cfName);
|
||||||
|
|
Loading…
Reference in New Issue