From a3a0d9c81f191913efb5cd1dce6cf8b4a0eb16d2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 27 Mar 2023 07:42:11 +0000 Subject: [PATCH] add backend --- include/libs/stream/streamState.h | 12 ++- source/libs/stream/src/streamState.c | 2 +- source/libs/stream/src/streamStateRocksdb.c | 82 ++++++++++++++------- 3 files changed, 63 insertions(+), 33 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 2a4185c6ab..b414d99c39 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -34,13 +34,11 @@ typedef struct STdbState { rocksdb_t* rocksdb; rocksdb_column_family_handle_t** pHandle; - rocksdb_writeoptions_t* wopts; - rocksdb_readoptions_t* ropts; - // rocksdb_column_family_handle_t* fillStateDB; - // rocksdb_column_family_handle_t* sessStateDB; - // rocksdb_column_family_handle_t* funcStateDB; - // rocksdb_column_family_handle_t* parnameStateDB; - // rocksdb_column_family_handle_t* partagStateDB; + rocksdb_writeoptions_t* writeOpts; + rocksdb_readoptions_t* readOpts; + rocksdb_options_t** cfOpts; + rocksdb_comparator_t** pCompare; + rocksdb_options_t* dbOpt; TDB* db; TTB* pStateDb; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 2c715b4adb..aa1468fefd 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -216,7 +216,7 @@ _err: void streamStateClose(SStreamState* pState) { #ifdef USE_ROCKSDB - // streamCleanBackend(pState); + streamCleanBackend(pState); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 68fe73f687..8aed1883f2 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -292,6 +292,7 @@ const char* compareSessionKey(void* name) { return cfName[2]; } const char* compareFuncKey(void* name) { return cfName[3]; } const char* compareParKey(void* name) { return cfName[4]; } const char* comparePartagKey(void* name) { return cfName[5]; } +void destroyFunc(void* stata) { return; } int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_t* opts = rocksdb_options_create(); @@ -309,43 +310,72 @@ int streamInitBackend(SStreamState* pState, char* path) { cfOpt[i] = rocksdb_options_create_copy(opts); } - rocksdb_comparator_t* stateCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName); + rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); + + rocksdb_comparator_t* stateCompare = rocksdb_comparator_create(NULL, destroyFunc, stateKeyDBComp, compareStateName); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[0], stateCompare); + pCompare[0] = stateCompare; - rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, NULL, winKeyDBComp, compareWinKeyName); + rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, destroyFunc, winKeyDBComp, compareWinKeyName); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[1], fillCompare); + pCompare[1] = fillCompare; - rocksdb_comparator_t* sessCompare = rocksdb_comparator_create(NULL, NULL, stateSessionKeyDBComp, compareSessionKey); + rocksdb_comparator_t* sessCompare = + rocksdb_comparator_create(NULL, destroyFunc, stateSessionKeyDBComp, compareSessionKey); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[2], sessCompare); + pCompare[2] = sessCompare; - rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, NULL, tupleKeyDBComp, compareFuncKey); + rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, destroyFunc, tupleKeyDBComp, compareFuncKey); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[3], funcCompare); + pCompare[3] = funcCompare; - rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, NULL, parKeyDBComp, compareParKey); + rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, destroyFunc, parKeyDBComp, compareParKey); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[4], parnameCompare); + pCompare[4] = parnameCompare; - rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, NULL, parKeyDBComp, comparePartagKey); + rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, destroyFunc, parKeyDBComp, comparePartagKey); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[5], partagCompare); + pCompare[5] = partagCompare; rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*)); rocksdb_t* db = rocksdb_open_column_families(opts, path, cfLen, cfName, cfOpt, cfHandle, &err); pState->pTdbState->rocksdb = db; pState->pTdbState->pHandle = cfHandle; - pState->pTdbState->wopts = rocksdb_writeoptions_create(); + pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); // rocksdb_writeoptions_ - rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->wopts, 1); - pState->pTdbState->ropts = rocksdb_readoptions_create(); + rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->writeOpts, 1); + pState->pTdbState->readOpts = rocksdb_readoptions_create(); + pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt; + pState->pTdbState->pCompare = pCompare; + pState->pTdbState->dbOpt = opts; return 0; } void streamCleanBackend(SStreamState* pState) { + if (pState->pTdbState->rocksdb == NULL) { + qInfo("rocksdb already free"); + return; + } int cfLen = sizeof(cfName) / sizeof(cfName[0]); for (int i = 0; i < cfLen; i++) { rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); + rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); + rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); } - rocksdb_writeoptions_destroy(pState->pTdbState->wopts); - rocksdb_readoptions_destroy(pState->pTdbState->ropts); + rocksdb_options_destroy(pState->pTdbState->dbOpt); + + taosMemoryFreeClear(pState->pTdbState->pHandle); + taosMemoryFreeClear(pState->pTdbState->cfOpts); + taosMemoryFree(pState->pTdbState->pCompare); + + rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts); + pState->pTdbState->writeOpts = NULL; + + rocksdb_readoptions_destroy(pState->pTdbState->readOpts); + pState->pTdbState->readOpts = NULL; + rocksdb_close(pState->pTdbState->rocksdb); + pState->pTdbState->rocksdb = NULL; } int streamGetInit(const char* funcName) { @@ -370,7 +400,7 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len } rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName) { int idx = streamGetInit(cfName); - return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, pState->pTdbState->pHandle[idx]); } @@ -389,7 +419,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ginitDict[i].toStrFunc((void*)key, toString); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (const char*)value, (size_t)vLen, &err); \ if (err != NULL) { \ taosMemoryFree(err); \ @@ -415,7 +445,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->ropts; \ + rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ size_t len = 0; \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)&len, &err); \ if (val == NULL) { \ @@ -449,7 +479,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \ if (err != NULL) { \ qWarn("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \ @@ -554,7 +584,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta pCur->number = pState->number; // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, // pState->pTdbState->pHandle[2]); pCur->iter = streamStateIterCreate(pState, "sess"); @@ -593,11 +623,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta return NULL; } // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, // pState->pTdbState->pHandle[2]); // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, // pState->pTdbState->pHandle[2]); pCur->iter = streamStateIterCreate(pState, "sess"); pCur->number = pState->number; @@ -641,7 +671,8 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con return NULL; } // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, + // pState->pTdbState->pHandle[2]); pCur->iter = streamStateIterCreate(pState, "sess"); @@ -688,7 +719,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* if (pCur == NULL) return NULL; // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, // pState->pTdbState->pHandle[0]); pCur->iter = streamStateIterCreate(pState, "default"); @@ -754,7 +785,8 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK if (pCur == NULL) return NULL; /// pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, + // pState->pTdbState->pHandle[1]); pCur->iter = streamStateIterCreate(pState, "fill"); char buf[128] = {0}; @@ -869,7 +901,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin } pCur->number = pState->number; // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, // pState->pTdbState->pHandle[0]); pCur->iter = streamStateIterCreate(pState, "default"); @@ -903,7 +935,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const return NULL; } // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, // pState->pTdbState->pHandle[1]); pCur->iter = streamStateIterCreate(pState, "fill"); @@ -936,7 +968,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const return NULL; } // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, // pState->pTdbState->pHandle[1]); pCur->iter = streamStateIterCreate(pState, "fill"); @@ -984,7 +1016,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes } pCur->number = pState->number; // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, // pState->pTdbState->pHandle[2]); pCur->iter = streamStateIterCreate(pState, "sess");