diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 28d34defd7..2e4157ad29 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -583,20 +583,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta } pCur->number = pState->number; - // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, - // pState->pTdbState->pHandle[2]); pCur->iter = streamStateIterCreate(pState, "sess"); char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); - - // char toString[128] = {0}; - // stateSessionKeyToString(&sKey, toString); - // qWarn("streamState seek key %s", toString); - bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len); - if (valid == false) { + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; } @@ -633,17 +625,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); - - // rocksdb_iter_seek(pCur->iter, (const char*)buf, len); - // if (!rocksdb_iter_valid(pCur->iter)) { - // rocksdb_iter_seek_for_prev(pCur->iter, buf, len); - // if (!rocksdb_iter_valid(pCur->iter)) { - // streamStateFreeCur(pCur); - // return NULL; - // } - // } - bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len); - if (valid == false) { + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; } @@ -674,9 +656,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con char buf[128] = {0}; int len = stateSessionKeyEncode(&sKey, buf); - - bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len); - if (valid == false) { + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; } @@ -773,11 +753,10 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK if (pCur == NULL) return NULL; pCur->iter = streamStateIterCreate(pState, "fill"); + char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); - - bool valid = streamStateIterSeekAndValid(pCur->iter, buf, len); - if (valid == false) { + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; } @@ -884,15 +863,11 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin return NULL; } pCur->number = pState->number; - // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, - // pState->pTdbState->pHandle[0]); pCur->iter = streamStateIterCreate(pState, "default"); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; int len = stateKeyEncode((void*)&sKey, buf); - if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; @@ -918,14 +893,11 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const if (!pCur) { return NULL; } - // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, - // pState->pTdbState->pHandle[1]); + pCur->iter = streamStateIterCreate(pState, "fill"); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); - if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; @@ -951,14 +923,11 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const if (pCur == NULL) { return NULL; } - // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, - // pState->pTdbState->pHandle[1]); + pCur->iter = streamStateIterCreate(pState, "fill"); char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); - if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; @@ -999,17 +968,12 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes return -1; } pCur->number = pState->number; - // pCur->iter = - // rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts, - // pState->pTdbState->pHandle[2]); - pCur->iter = streamStateIterCreate(pState, "sess"); SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; char buf[128] = {0}; int len = stateSessionKeyEncode(&sKey, buf); - if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return -1;