add backend

This commit is contained in:
yihaoDeng 2023-03-27 02:20:49 +00:00
parent 2cc2e5d522
commit 0690604a49
1 changed files with 50 additions and 18 deletions

View File

@ -368,6 +368,12 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
valid = true;
return valid;
}
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName) {
int idx = streamGetInit(cfName);
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
pState->pTdbState->pHandle[idx]);
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
@ -547,8 +553,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
}
pCur->number = pState->number;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
@ -584,8 +592,14 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
if (pCur == NULL) {
return NULL;
}
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// pState->pTdbState->pHandle[2]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
pCur->number = pState->number;
char buf[128] = {0};
@ -626,9 +640,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
if (pCur == NULL) {
return NULL;
}
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
pCur->number = pState->number;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
@ -669,8 +687,11 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// pState->pTdbState->pHandle[0]);
pCur->iter = streamStateIterCreate(pState, "default");
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
@ -732,8 +753,10 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
if (pCur == NULL) return NULL;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
/// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
pCur->iter = streamStateIterCreate(pState, "fill");
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
@ -845,8 +868,10 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
return NULL;
}
pCur->number = pState->number;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// pState->pTdbState->pHandle[0]);
pCur->iter = streamStateIterCreate(pState, "default");
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
@ -877,8 +902,10 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
if (!pCur) {
return NULL;
}
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// pState->pTdbState->pHandle[1]);
pCur->iter = streamStateIterCreate(pState, "fill");
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
@ -908,8 +935,10 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
if (pCur == NULL) {
return NULL;
}
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// pState->pTdbState->pHandle[1]);
pCur->iter = streamStateIterCreate(pState, "fill");
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
@ -954,8 +983,11 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
return -1;
}
pCur->number = pState->number;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;