add backend

This commit is contained in:
yihaoDeng 2023-03-27 08:34:12 +00:00
parent 011c791b89
commit 5a8c46b733
1 changed files with 7 additions and 43 deletions

View File

@ -583,20 +583,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
}
pCur->number = pState->number;
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
// char toString[128] = {0};
// stateSessionKeyToString(&sKey, toString);
// qWarn("streamState seek key %s", toString);
bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len);
if (valid == false) {
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
@ -633,17 +625,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
// rocksdb_iter_seek(pCur->iter, (const char*)buf, len);
// if (!rocksdb_iter_valid(pCur->iter)) {
// rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
// if (!rocksdb_iter_valid(pCur->iter)) {
// streamStateFreeCur(pCur);
// return NULL;
// }
// }
bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len);
if (valid == false) {
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
@ -674,9 +656,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
char buf[128] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len);
if (valid == false) {
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
@ -773,11 +753,10 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
if (pCur == NULL) return NULL;
pCur->iter = streamStateIterCreate(pState, "fill");
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len);
if (valid == false) {
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
@ -884,15 +863,11 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
return NULL;
}
pCur->number = pState->number;
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[0]);
pCur->iter = streamStateIterCreate(pState, "default");
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
int len = stateKeyEncode((void*)&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
@ -918,14 +893,11 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
if (!pCur) {
return NULL;
}
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[1]);
pCur->iter = streamStateIterCreate(pState, "fill");
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
@ -951,14 +923,11 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
if (pCur == NULL) {
return NULL;
}
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[1]);
pCur->iter = streamStateIterCreate(pState, "fill");
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
@ -999,17 +968,12 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
return -1;
}
pCur->number = pState->number;
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
char buf[128] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return -1;