From 0690604a49285a70848c19af7e89de8002036b46 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 27 Mar 2023 02:20:49 +0000 Subject: [PATCH] add backend --- source/libs/stream/src/streamStateRocksdb.c | 68 +++++++++++++++------ 1 file changed, 50 insertions(+), 18 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 6cddf6649c..68fe73f687 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -368,6 +368,12 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len valid = true; return valid; } +rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName) { + int idx = streamGetInit(cfName); + return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + pState->pTdbState->pHandle[idx]); +} + #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ do { \ code = 0; \ @@ -547,8 +553,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta } pCur->number = pState->number; - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + // pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // pState->pTdbState->pHandle[2]); + pCur->iter = streamStateIterCreate(pState, "sess"); char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; @@ -584,8 +592,14 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta if (pCur == NULL) { return NULL; } - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + // pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // pState->pTdbState->pHandle[2]); + + // pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // pState->pTdbState->pHandle[2]); + pCur->iter = streamStateIterCreate(pState, "sess"); pCur->number = pState->number; char buf[128] = {0}; @@ -626,9 +640,13 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con if (pCur == NULL) { return NULL; } - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + // pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + + pCur->iter = streamStateIterCreate(pState, "sess"); + pCur->number = pState->number; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; @@ -669,8 +687,11 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]); + // pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // pState->pTdbState->pHandle[0]); + + pCur->iter = streamStateIterCreate(pState, "default"); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; @@ -732,8 +753,10 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK if (pCur == NULL) return NULL; - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); + /// pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); + pCur->iter = streamStateIterCreate(pState, "fill"); + char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -845,8 +868,10 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin return NULL; } pCur->number = pState->number; - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]); + // pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // pState->pTdbState->pHandle[0]); + pCur->iter = streamStateIterCreate(pState, "default"); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; @@ -877,8 +902,10 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const if (!pCur) { return NULL; } - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); + // pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // pState->pTdbState->pHandle[1]); + pCur->iter = streamStateIterCreate(pState, "fill"); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -908,8 +935,10 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const if (pCur == NULL) { return NULL; } - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); + // pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // pState->pTdbState->pHandle[1]); + pCur->iter = streamStateIterCreate(pState, "fill"); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -954,8 +983,11 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes return -1; } pCur->number = pState->number; - pCur->iter = - rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + // pCur->iter = + // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, + // pState->pTdbState->pHandle[2]); + + pCur->iter = streamStateIterCreate(pState, "sess"); SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0;