diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 916fdff867..281c5b8ded 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -296,10 +296,15 @@ const char* comparePartagKey(void* name) { return cfName[5]; } void destroyFunc(void* stata) { return; } int streamInitBackend(SStreamState* pState, char* path) { + rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); + rocksdb_env_set_low_priority_background_threads(env, 15); + rocksdb_env_set_high_priority_background_threads(env, 5); + rocksdb_options_t* opts = rocksdb_options_create(); - rocksdb_options_increase_parallelism(opts, 8); - // rocksdb_options_optimize_level_style_compaction(opts, 0); - // create the DB if it's not already present + rocksdb_options_set_env(opts, env); + // rocksdb_options_increase_parallelism(opts, 8); + // rocksdb_options_optimize_level_style_compaction(opts, 0); + // create the DB if it's not already present rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_missing_column_families(opts, 1); rocksdb_options_set_write_buffer_size(opts, 128 << 20); @@ -576,66 +581,66 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); - // SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; - // SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; - // char sKeyStr[128] = {0}; - // char eKeyStr[128] = {0}; + SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; + char sKeyStr[128] = {0}; + char eKeyStr[128] = {0}; - // int sLen = stateKeyEncode(&sKey, sKeyStr); - // int eLen = stateKeyEncode(&eKey, eKeyStr); + int sLen = stateKeyEncode(&sKey, sKeyStr); + int eLen = stateKeyEncode(&eKey, eKeyStr); - // char toStringStart[128] = {0}; - // char toStringEnd[128] = {0}; - // if (qDebugFlag & DEBUG_TRACE) { - // stateKeyToString(&sKey, toStringStart); - // stateKeyToString(&eKey, toStringEnd); - // } + char toStringStart[128] = {0}; + char toStringEnd[128] = {0}; + if (qDebugFlag & DEBUG_TRACE) { + stateKeyToString(&sKey, toStringStart); + stateKeyToString(&eKey, toStringEnd); + } - // char* err = NULL; - // rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0], - // sKeyStr, sLen, eKeyStr, eLen, &err); - // // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, - // eLen); if (err != NULL) { - // qWarn("failed to delete range cf(default) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd); - // taosMemoryFree(err); - // } + char* err = NULL; + rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0], + sKeyStr, sLen, eKeyStr, eLen, &err); + // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, eLen); + if (err != NULL) { + qWarn("failed to delete range cf(default) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd); + taosMemoryFree(err); + } // del one by one - char buf[128] = {0}; - char* s = "null"; - SWinKey key = {.ts = 0, .groupId = 0}; - SStateKey skey = {.key = key, .opNum = pState->number}; - int sLen = stateKeyEncode(&skey, buf); + // char buf[128] = {0}; + // char* s = "null"; + // SWinKey key = {.ts = 0, .groupId = 0}; + // SStateKey skey = {.key = key, .opNum = pState->number}; + // int sLen = stateKeyEncode(&skey, buf); - streamStatePut_rocksdb(pState, &key, s, strlen(s)); + // streamStatePut_rocksdb(pState, &key, s, strlen(s)); - rocksdb_readoptions_t* opt = NULL; - rocksdb_iterator_t* iter = streamStateIterCreate(pState, "default", NULL, &opt); - rocksdb_iter_seek(iter, buf, sLen); + // rocksdb_readoptions_t* opt = NULL; + // rocksdb_iterator_t* iter = streamStateIterCreate(pState, "default", NULL, &opt); + // rocksdb_iter_seek(iter, buf, sLen); - char* err = NULL; - while (rocksdb_iter_valid(iter)) { - int kLen = 0; - char* key = (char*)rocksdb_iter_key(iter, (size_t*)&kLen); + // char* err = NULL; + // while (rocksdb_iter_valid(iter)) { + // int32_t kLen = 0; + // char* key = (char*)rocksdb_iter_key(iter, (size_t*)&kLen); - SStateKey ckey = {0}; - stateKeyDecode((void*)&ckey, key); - if (ckey.opNum != pState->number) { - break; - } - if (stateKeyCmpr(&skey, sizeof(skey), &ckey, sizeof(ckey)) > 0) { - break; - } + // SStateKey ckey = {0}; + // stateKeyDecode((void*)&ckey, key); + // if (ckey.opNum != pState->number) { + // break; + // } + // if (stateKeyCmpr(&skey, sizeof(skey), &ckey, sizeof(ckey)) > 0) { + // break; + // } - rocksdb_delete_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0], key, - kLen, &err); - if (err != NULL) { - taosMemoryFree(err); - } - if (rocksdb_iter_valid(iter)) rocksdb_iter_next(iter); - } - rocksdb_iter_destroy(iter); + // rocksdb_delete_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0], key, + // kLen, &err); + // if (err != NULL) { + // taosMemoryFree(err); + // } + // if (rocksdb_iter_valid(iter)) rocksdb_iter_next(iter); + // } + // rocksdb_iter_destroy(iter); return 0; } @@ -898,7 +903,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* return -1; } SStateSessionKey ktmp = {0}; - int32_t kLen, vLen; + size_t kLen = 0, vLen = 0; if (!rocksdb_iter_valid(pCur->iter)) { return -1; @@ -1050,7 +1055,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes return -1; } - int32_t kLen; + size_t kLen; const char* iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); SStateSessionKey iKey = {0}; stateSessionKeyDecode(&iKey, (char*)iKeyStr);