From ae54e994caaf181e0c92d279695c1cf06f6fa229 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 29 Mar 2023 01:27:39 +0000 Subject: [PATCH] add backend --- source/libs/stream/src/streamStateRocksdb.c | 57 +++++++++++---------- 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 70cfd55219..87113538b3 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -271,6 +271,7 @@ typedef int (*ToStringFunc)(void* key, char* buf); typedef struct { const char* key; + int32_t len; int idx; EncodeFunc enFunc; DecodeFunc deFunc; @@ -278,12 +279,12 @@ typedef struct { } SCfInit; SCfInit ginitDict[] = { - {"default", 0, stateKeyEncode, stateKeyDecode, stateKeyToString}, - {"fill", 1, winKeyEncode, winKeyDecode, winKeyToString}, - {"sess", 2, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString}, - {"func", 3, tupleKeyEncode, tupleKeyDecode, tupleKeyToString}, - {"parname", 4, parKeyEncode, parKeyDecode, parKeyToString}, - {"partag", 5, parKeyEncode, parKeyDecode, parKeyToString}, + {"default", strlen("default"), 0, stateKeyEncode, stateKeyDecode, stateKeyToString}, + {"fill", strlen("fill"), 1, winKeyEncode, winKeyDecode, winKeyToString}, + {"sess", strlen("sess"), 2, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString}, + {"func", strlen("func"), 3, tupleKeyEncode, tupleKeyDecode, tupleKeyToString}, + {"parname", strlen("parname"), 4, parKeyEncode, parKeyDecode, parKeyToString}, + {"partag", strlen("partag"), 5, parKeyEncode, parKeyDecode, parKeyToString}, }; const char* compareStateName(void* name) { return cfName[0]; } @@ -313,7 +314,7 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); rocksdb_block_based_options_set_block_cache(tableOpt, cache); - rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom_full(15); + rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom_full(20); rocksdb_block_based_options_set_filter_policy(tableOpt, filter); rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); @@ -388,8 +389,9 @@ void streamCleanBackend(SStreamState* pState) { } int streamGetInit(const char* funcName) { + size_t len = strlen(funcName); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { - if (strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) { + if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) { return i; } } @@ -424,14 +426,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ do { \ code = 0; \ - char buf[128] = {0}; \ + char buf[40] = {0}; \ char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ return -1; \ } \ - char toString[128] = {0}; \ + char toString[40] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ @@ -450,14 +452,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ do { \ code = 0; \ - char buf[128] = {0}; \ + char buf[40] = {0}; \ char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ return -1; \ } \ - char toString[128] = {0}; \ + char toString[40] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ @@ -485,14 +487,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ do { \ code = 0; \ - char buf[128] = {0}; \ + char buf[40] = {0}; \ char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ qWarn("streamState failed to get cf name: %s", funcname); \ return -1; \ } \ - char toString[128] = {0}; \ + char toString[40] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ @@ -571,15 +573,14 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; - char sKeyStr[128] = {0}; - char eKeyStr[128] = {0}; + char sKeyStr[40] = {0}; + char eKeyStr[40] = {0}; int sLen = stateKeyEncode(&sKey, sKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr); - char toStringStart[128] = {0}; - char toStringEnd[128] = {0}; - + char toStringStart[40] = {0}; + char toStringEnd[40] = {0}; if (qDebugFlag & DEBUG_TRACE) { stateKeyToString(&sKey, toStringStart); stateKeyToString(&eKey, toStringEnd); @@ -613,7 +614,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); - char buf[128] = {0}; + char buf[40] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { @@ -646,7 +647,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); pCur->number = pState->number; - char buf[128] = {0}; + char buf[40] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); @@ -680,7 +681,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; + char buf[40] = {0}; int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -720,7 +721,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; + char buf[40] = {0}; int len = stateKeyEncode((void*)&sKey, buf); rocksdb_iter_seek(pCur->iter, buf, len); @@ -776,7 +777,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); - char buf[128] = {0}; + char buf[40] = {0}; int len = winKeyEncode((void*)key, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -889,7 +890,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); SStateKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; + char buf[40] = {0}; int len = stateKeyEncode((void*)&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -920,7 +921,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); - char buf[128] = {0}; + char buf[32] = {0}; int len = winKeyEncode((void*)key, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -951,7 +952,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const pCur->db = pState->pTdbState->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); - char buf[128] = {0}; + char buf[24] = {0}; int len = winKeyEncode((void*)key, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); @@ -998,7 +999,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; - char buf[128] = {0}; + char buf[64] = {0}; int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur);