add sess iter
This commit is contained in:
parent
5b117199f5
commit
c6e9134cea
|
@ -2054,9 +2054,6 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[128] = {0};
|
|
||||||
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
|
|
||||||
|
|
||||||
{
|
{
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
stateKeyToString((void*)&maxStateKey, tbuf);
|
stateKeyToString((void*)&maxStateKey, tbuf);
|
||||||
|
@ -2071,6 +2068,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
|
||||||
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
|
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
(rocksdb_readoptions_t**)&pCur->readOpt);
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
|
||||||
|
char buf[128] = {0};
|
||||||
|
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
|
||||||
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
|
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
|
||||||
rocksdb_iter_prev(pCur->iter);
|
rocksdb_iter_prev(pCur->iter);
|
||||||
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
|
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
|
||||||
|
@ -2175,6 +2174,32 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
|
||||||
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
|
STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState, const SSessionKey* key) {
|
||||||
|
qDebug("streamStateSessionSeekToLast_rocksdb");
|
||||||
|
// char buf[128] = {0};
|
||||||
|
SStateSessionKey maxKey = {.key = *key, .opNum = INT64_MAX};
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// int32_t klen = stateSessionKeyEncode((void*)&maxKey, buf);
|
||||||
|
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &maxKey, "", 0);
|
||||||
|
if (code != 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
||||||
|
SStreamStateCur* pCur = createStreamStateCursor();
|
||||||
|
if (pCur == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pCur->number = pState->number;
|
||||||
|
pCur->db = wrapper->rocksdb;
|
||||||
|
|
||||||
|
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
|
||||||
|
(rocksdb_readoptions_t**)&pCur->readOpt);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
|
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
|
||||||
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
|
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue