diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c040d15a74..f808e90e1c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2054,9 +2054,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK return NULL; } - char buf[128] = {0}; - int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); - { char tbuf[256] = {0}; stateKeyToString((void*)&maxStateKey, tbuf); @@ -2071,6 +2068,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + char buf[128] = {0}; + int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_prev(pCur->iter); while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { @@ -2175,6 +2174,32 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey); return code; } + +SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, const SSessionKey* key) { + qDebug("streamStateSessionSeekToLast_rocksdb"); + // char buf[128] = {0}; + SStateSessionKey maxKey = {.key = *key, .opNum = INT64_MAX}; + int32_t code = 0; + + // int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf); + STREAM_STATE_PUT_ROCKSDB(pState, "sess", &maxKey, "", 0); + if (code != 0) { + return NULL; + } + + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = createStreamStateCursor(); + if (pCur == NULL) { + return NULL; + } + pCur->number = pState->number; + pCur->db = wrapper->rocksdb; + + pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, + (rocksdb_readoptions_t**)&pCur->readOpt); + + return NULL; +} SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");