rm expire checkpoint

This commit is contained in:
yihaoDeng 2023-07-13 10:00:16 +08:00
parent 773ae39d33
commit 2ec67bcc08
1 changed files with 13 additions and 4 deletions

View File

@ -1438,6 +1438,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
streamStateFreeCur(pCur);
streamStateDel_rocksdb(pState, &tmp);
@ -1553,6 +1554,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
@ -1562,6 +1564,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter);
@ -1587,6 +1591,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
@ -1855,6 +1860,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
@ -1916,6 +1922,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
@ -1953,6 +1960,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
@ -1986,10 +1994,10 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
if (pCur == NULL) {
return -1;
}
pCur->number = pState->number;
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
@ -2263,6 +2271,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
return pCur;
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {