add backend

This commit is contained in:
yihaoDeng 2023-03-29 01:27:39 +00:00
parent 1e597503bc
commit ae54e994ca
1 changed files with 29 additions and 28 deletions

View File

@ -271,6 +271,7 @@ typedef int (*ToStringFunc)(void* key, char* buf);
typedef struct {
const char* key;
int32_t len;
int idx;
EncodeFunc enFunc;
DecodeFunc deFunc;
@ -278,12 +279,12 @@ typedef struct {
} SCfInit;
SCfInit ginitDict[] = {
{"default", 0, stateKeyEncode, stateKeyDecode, stateKeyToString},
{"fill", 1, winKeyEncode, winKeyDecode, winKeyToString},
{"sess", 2, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString},
{"func", 3, tupleKeyEncode, tupleKeyDecode, tupleKeyToString},
{"parname", 4, parKeyEncode, parKeyDecode, parKeyToString},
{"partag", 5, parKeyEncode, parKeyDecode, parKeyToString},
{"default", strlen("default"), 0, stateKeyEncode, stateKeyDecode, stateKeyToString},
{"fill", strlen("fill"), 1, winKeyEncode, winKeyDecode, winKeyToString},
{"sess", strlen("sess"), 2, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString},
{"func", strlen("func"), 3, tupleKeyEncode, tupleKeyDecode, tupleKeyToString},
{"parname", strlen("parname"), 4, parKeyEncode, parKeyDecode, parKeyToString},
{"partag", strlen("partag"), 5, parKeyEncode, parKeyDecode, parKeyToString},
};
const char* compareStateName(void* name) { return cfName[0]; }
@ -313,7 +314,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20);
rocksdb_block_based_options_set_block_cache(tableOpt, cache);
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom_full(15);
rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom_full(20);
rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt);
@ -388,8 +389,9 @@ void streamCleanBackend(SStreamState* pState) {
}
int streamGetInit(const char* funcName) {
size_t len = strlen(funcName);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
return i;
}
}
@ -424,14 +426,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char buf[40] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
} \
char toString[128] = {0}; \
char toString[40] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
@ -450,14 +452,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[128] = {0}; \
char buf[40] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
} \
char toString[128] = {0}; \
char toString[40] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
@ -485,14 +487,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
do { \
code = 0; \
char buf[128] = {0}; \
char buf[40] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
} \
char toString[128] = {0}; \
char toString[40] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
@ -571,15 +573,14 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
char sKeyStr[40] = {0};
char eKeyStr[40] = {0};
int sLen = stateKeyEncode(&sKey, sKeyStr);
int eLen = stateKeyEncode(&eKey, eKeyStr);
char toStringStart[128] = {0};
char toStringEnd[128] = {0};
char toStringStart[40] = {0};
char toStringEnd[40] = {0};
if (qDebugFlag & DEBUG_TRACE) {
stateKeyToString(&sKey, toStringStart);
stateKeyToString(&eKey, toStringEnd);
@ -613,7 +614,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
char buf[128] = {0};
char buf[40] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
@ -646,7 +647,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
pCur->number = pState->number;
char buf[128] = {0};
char buf[40] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
@ -680,7 +681,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
char buf[40] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
@ -720,7 +721,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
char buf[40] = {0};
int len = stateKeyEncode((void*)&sKey, buf);
rocksdb_iter_seek(pCur->iter, buf, len);
@ -776,7 +777,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
char buf[128] = {0};
char buf[40] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
@ -889,7 +890,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
char buf[40] = {0};
int len = stateKeyEncode((void*)&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
@ -920,7 +921,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
char buf[128] = {0};
char buf[32] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
@ -951,7 +952,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
char buf[128] = {0};
char buf[24] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
@ -998,7 +999,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
char buf[128] = {0};
char buf[64] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);