diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index 58ae9a1c59..c651b6d6ca 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -4,8 +4,9 @@ ExternalProject_Add(rocksdb GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git GIT_TAG v6.23.3 SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" + CMAKE_ARGS "-DPORTABLE=on" CONFIGURE_COMMAND "" BUILD_COMMAND "" INSTALL_COMMAND "" TEST_COMMAND "" - ) \ No newline at end of file + ) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index b61faf957f..9b80ce2786 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -149,6 +149,12 @@ typedef struct SStateSessionKey { int64_t opNum; } SStateSessionKey; +typedef struct streamValue { + int64_t unixTimestamp; + int32_t len; + char data[0]; +} streamValue; + int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2); diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index ee9a3824c0..577807b722 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -288,6 +288,39 @@ int parKeyToString(void* k, char* buf) { n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key); return n; } +int stremaValueEncode(void* k, char* buf) { + int len = 0; + streamValue* key = k; + len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp); + len += taosEncodeFixedI32((void**)&buf, key->len); + len += taosEncodeBinary((void**)&buf, key->data, key->len); + return len; +} +int streamValueDecode(void* k, char* buf) { + streamValue* key = k; + char* p = buf; + p = taosDecodeFixedI64(p, &key->unixTimestamp); + p = taosDecodeFixedI32(p, &key->len); + p = taosDecodeBinary(p, (void**)&key->data, key->len); + return p - buf; +} +int32_t streamValueToString(void* k, char* buf) { + streamValue* key = k; + int n = 0; + n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp); + n += sprintf(buf + n, "len:%d,", key->len); + n += sprintf(buf + n, "data:%s]", key->data); + return n; +} +/*1: stale, 0: no stale*/ + +int32_t streaValueIsStale(void* k, int64_t ts) { + streamValue* key = k; + if (key->unixTimestamp < ts) { + return 1; + } + return 0; +} typedef struct { void* tableOpt; @@ -421,11 +454,6 @@ void streamCleanBackend(SStreamState* pState) { rocksdbCfParam* param = pState->pTdbState->param; for (int i = 0; i < cfLen; i++) { rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); - // rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); - // rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); - - // rocksdb_cache_destroy(param[i].lru); - // rocksdb_block_based_options_destroy(param[i].tableOpt); } taosMemoryFreeClear(pState->pTdbState->pHandle); @@ -438,8 +466,8 @@ void streamCleanBackend(SStreamState* pState) { pState->pTdbState->readOpts = NULL; rocksdb_close(pState->pTdbState->rocksdb); + // wait for all background work to finish for (int i = 0; i < cfLen; i++) { - // rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]);