diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f808e90e1c..dde2354abe 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2177,8 +2177,10 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekToLast_rocksdb"); - // char buf[128] = {0}; - SStateSessionKey maxKey = {.key = *key, .opNum = INT64_MAX}; + + SSessionKey maxSessionKey = {.groupId = UINT64_MAX, .win = {.skey = INT64_MAX, .ekey = INT64_MAX}}; + + SStateSessionKey maxKey = {.key = maxSessionKey, .opNum = INT64_MAX}; int32_t code = 0; // int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf); @@ -2192,13 +2194,27 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, cons if (pCur == NULL) { return NULL; } + pCur->number = pState->number; pCur->db = wrapper->rocksdb; - pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); - return NULL; + char buf[128] = {0}; + int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf); + rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); + rocksdb_iter_prev(pCur->iter); + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { + rocksdb_iter_prev(pCur->iter); + } + + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + pCur = NULL; + } + + STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey); + return pCur; } SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");