change write opt

This commit is contained in:
yihaoDeng 2023-04-20 04:05:35 +00:00
parent b6108921ec
commit 3272d14e33
3 changed files with 42 additions and 7 deletions

View File

@ -4,8 +4,9 @@ ExternalProject_Add(rocksdb
GIT_REPOSITORY https://github.com/taosdata-contrib/rocksdb.git
GIT_TAG v6.23.3
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CMAKE_ARGS "-DPORTABLE=on"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
)

View File

@ -149,6 +149,12 @@ typedef struct SStateSessionKey {
int64_t opNum;
} SStateSessionKey;
typedef struct streamValue {
int64_t unixTimestamp;
int32_t len;
char data[0];
} streamValue;
int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2);
int stateSessionKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2);

View File

@ -288,6 +288,39 @@ int parKeyToString(void* k, char* buf) {
n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key);
return n;
}
int stremaValueEncode(void* k, char* buf) {
int len = 0;
streamValue* key = k;
len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp);
len += taosEncodeFixedI32((void**)&buf, key->len);
len += taosEncodeBinary((void**)&buf, key->data, key->len);
return len;
}
int streamValueDecode(void* k, char* buf) {
streamValue* key = k;
char* p = buf;
p = taosDecodeFixedI64(p, &key->unixTimestamp);
p = taosDecodeFixedI32(p, &key->len);
p = taosDecodeBinary(p, (void**)&key->data, key->len);
return p - buf;
}
int32_t streamValueToString(void* k, char* buf) {
streamValue* key = k;
int n = 0;
n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp);
n += sprintf(buf + n, "len:%d,", key->len);
n += sprintf(buf + n, "data:%s]", key->data);
return n;
}
/*1: stale, 0: no stale*/
int32_t streaValueIsStale(void* k, int64_t ts) {
streamValue* key = k;
if (key->unixTimestamp < ts) {
return 1;
}
return 0;
}
typedef struct {
void* tableOpt;
@ -421,11 +454,6 @@ void streamCleanBackend(SStreamState* pState) {
rocksdbCfParam* param = pState->pTdbState->param;
for (int i = 0; i < cfLen; i++) {
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
// rocksdb_options_destroy(pState->pTdbState->cfOpts[i]);
// rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]);
// rocksdb_cache_destroy(param[i].lru);
// rocksdb_block_based_options_destroy(param[i].tableOpt);
}
taosMemoryFreeClear(pState->pTdbState->pHandle);
@ -438,8 +466,8 @@ void streamCleanBackend(SStreamState* pState) {
pState->pTdbState->readOpts = NULL;
rocksdb_close(pState->pTdbState->rocksdb);
// wait for all background work to finish
for (int i = 0; i < cfLen; i++) {
// rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
rocksdb_options_destroy(pState->pTdbState->cfOpts[i]);
rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]);