From c3a8ecd9c215f84645e522632402572bed938481 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 30 Mar 2023 03:56:01 +0000 Subject: [PATCH] reset version --- source/libs/stream/src/streamStateRocksdb.c | 90 +++++++++++++++------ 1 file changed, 66 insertions(+), 24 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 5c75a4c370..15e578ddfd 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -319,8 +319,8 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); - rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8); - rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans); + // rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8); + // rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans); }; rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); @@ -401,9 +401,12 @@ int streamGetInit(const char* funcName) { return -1; } bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) { - rocksdb_iter_seek_for_prev(iter, buf, len); + rocksdb_iter_seek(iter, buf, len); if (!rocksdb_iter_valid(iter)) { - return false; + rocksdb_iter_seek_for_prev(iter, buf, len); + if (!rocksdb_iter_valid(iter)) { + return false; + } } return true; } @@ -412,8 +415,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int idx = streamGetInit(cfName); //*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); - - *snapshot = NULL; + if (snapshot != NULL) { + *snapshot = NULL; + } rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); *readOpt = rOpt; @@ -572,29 +576,67 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); - SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; - SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; - char sKeyStr[128] = {0}; - char eKeyStr[128] = {0}; + // SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + // SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; + // char sKeyStr[128] = {0}; + // char eKeyStr[128] = {0}; - int sLen = stateKeyEncode(&sKey, sKeyStr); - int eLen = stateKeyEncode(&eKey, eKeyStr); + // int sLen = stateKeyEncode(&sKey, sKeyStr); + // int eLen = stateKeyEncode(&eKey, eKeyStr); - char toStringStart[128] = {0}; - char toStringEnd[128] = {0}; - if (qDebugFlag & DEBUG_TRACE) { - stateKeyToString(&sKey, toStringStart); - stateKeyToString(&eKey, toStringEnd); - } + // char toStringStart[128] = {0}; + // char toStringEnd[128] = {0}; + // if (qDebugFlag & DEBUG_TRACE) { + // stateKeyToString(&sKey, toStringStart); + // stateKeyToString(&eKey, toStringEnd); + // } + // char* err = NULL; + // rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0], + // sKeyStr, sLen, eKeyStr, eLen, &err); + // // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, + // eLen); if (err != NULL) { + // qWarn("failed to delete range cf(default) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd); + // taosMemoryFree(err); + // } + + // del one by one + + char buf[128] = {0}; + char* s = "null"; + SWinKey key = {.ts = 0, .groupId = 0}; + SStateKey skey = {.key = key, .opNum = pState->number}; + int sLen = stateKeyEncode(&skey, buf); + + streamStatePut_rocksdb(pState, &key, s, strlen(s)); + + rocksdb_readoptions_t* opt = NULL; + rocksdb_iterator_t* iter = streamStateIterCreate(pState, "default", NULL, &opt); + rocksdb_iter_seek(iter, buf, sLen); + + // int32_t limit = 128; + // krocksdb_writebatch_t* batch = rocksdb_writebatch_create(); char* err = NULL; - rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0], - sKeyStr, sLen, eKeyStr, eLen, &err); - // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, eLen); - if (err != NULL) { - qWarn("failed to delete range cf(default) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd); - taosMemoryFree(err); + while (rocksdb_iter_valid(iter)) { + int kLen = 0; + char* key = (char*)rocksdb_iter_key(iter, (size_t*)&kLen); + + SStateKey sskey = {0}; + stateKeyDecode((void*)&sskey, key); + if (sskey.opNum != pState->number) { + rocksdb_iter_next(iter); + continue; + } + + rocksdb_delete_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0], key, + kLen, &err); + if (err != NULL) { + taosMemoryFree(err); + } + if (rocksdb_iter_valid(iter)) rocksdb_iter_next(iter); } + rocksdb_iter_destroy(iter); + return 0; }