diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index e278cf28e7..9ceea4545b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1203,9 +1203,9 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len } return true; } -rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, +rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* pChkptFileName, rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt) { - int idx = streamStateGetCfIdx(pState, cfName); + int idx = streamStateGetCfIdx(pState, pChkptFileName); SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; if (snapshot != NULL) { @@ -1497,8 +1497,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); - int32_t code = 0; - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + int32_t code = 0; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); @@ -1506,6 +1505,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; + + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1520,6 +1521,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK streamStateFreeCur(pCur); pCur = NULL; } + STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); return pCur; } @@ -2251,15 +2253,16 @@ int32_t streamStateGetBatchSize(void* pBatch) { void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } -int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, +int32_t streamStatePutBatch(SStreamState* pState, const char* pChkptFileName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - int i = streamStateGetCfIdx(pState, cfName); + int i = streamStateGetCfIdx(pState, pChkptFileName); if (i < 0) { - qError("streamState failed to put to cf name:%s", cfName); + qError("streamState failed to put to cf name:%s", pChkptFileName); return -1; } + char buf[128] = {0}; int32_t klen = ginitDict[i].enFunc((void*)key, buf); @@ -2270,6 +2273,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr taosMemoryFree(ttlV); return 0; } + int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl, void* tmpBuf) { char buf[128] = {0};