From 2ec67bcc08aa55f577191ed0b432b88d18a9687c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 13 Jul 2023 10:00:16 +0800 Subject: [PATCH] rm expire checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 2ab55687a2..cc185d2c2c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1053,7 +1053,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*)); inst->dbOpt = handle->dbOpt; - //rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); + // rocksdb_writeoptions_disable_WAL(inst->wOpt, 1); taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*)); } else { inst = *pInst; @@ -1174,7 +1174,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); - //rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); + // rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); @@ -1438,7 +1438,8 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { SWinKey tmp = {.ts = 0, .groupId = 0}; streamStatePut_rocksdb(pState, &tmp, NULL, 0); SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); - int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + + int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); streamStateFreeCur(pCur); streamStateDel_rocksdb(pState, &tmp); return code; @@ -1553,6 +1554,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); + char buf[128] = {0}; int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); @@ -1562,6 +1564,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + pCur->number = pState->number; + rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_prev(pCur->iter); @@ -1587,6 +1591,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + pCur->number = pState->number; SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; @@ -1855,6 +1860,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + pCur->number = pState->number; char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -1916,6 +1922,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + pCur->number = pState->number; char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -1953,6 +1960,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + pCur->number = pState->number; char buf[128] = {0}; int len = winKeyEncode((void*)key, buf); @@ -1986,10 +1994,10 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes if (pCur == NULL) { return -1; } - pCur->number = pState->number; pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + pCur->number = pState->number; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; @@ -2263,6 +2271,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); + pCur->number = pState->number; return pCur; } int32_t streamDefaultIterValid_rocksdb(void* iter) {