add sess iter

This commit is contained in:
yihaoDeng 2023-10-10 10:41:46 +08:00
parent c6e9134cea
commit 7ac4ef5beb
1 changed files with 20 additions and 4 deletions

View File

@ -2177,8 +2177,10 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekToLast_rocksdb");
// char buf[128] = {0};
SStateSessionKey maxKey = {.key = *key, .opNum = INT64_MAX};
SSessionKey maxSessionKey = {.groupId = UINT64_MAX, .win = {.skey = INT64_MAX, .ekey = INT64_MAX}};
SStateSessionKey maxKey = {.key = maxSessionKey, .opNum = INT64_MAX};
int32_t code = 0;
// int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf);
@ -2192,13 +2194,27 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, cons
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
return NULL;
char buf[128] = {0};
int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_prev(pCur->iter);
}
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
pCur = NULL;
}
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &maxKey);
return pCur;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");