diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 1962e41b6c..4f2618f8dc 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -18,6 +18,7 @@ #include "os.h" +#include "tarray.h" #include "tdef.h" #include "tlist.h" @@ -27,29 +28,33 @@ extern "C" { typedef struct SStreamFileState SStreamFileState; typedef struct SRowBuffPos { - void* pRowBuff; - void* pKey; - bool beFlushed; - bool beUsed; + void* pRowBuff; + void* pKey; + bool beFlushed; + bool beUsed; } SRowBuffPos; typedef SList SStreamSnapshot; typedef TSKEY (*GetTsFun)(void*); -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark); +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, + TSKEY delMark); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); -void releaseRowBuffPos(SRowBuffPos* pBuff); -bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); +void releaseRowBuffPos(SRowBuffPos* pBuff); +bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); -int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); -int32_t recoverSnapshot(SStreamFileState* pFileState); +int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); +int32_t recoverSnapshot(SStreamFileState* pFileState); + +int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); +int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId); #ifdef __cplusplus } diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index a06aef365f..a33a0a577b 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -89,4 +89,5 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); +int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 02d3778baa..fb5ad34d0d 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -458,9 +458,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_t** readOpt) { int idx = streamGetInit(cfName); - //*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); if (snapshot != NULL) { - *snapshot = NULL; + *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); } rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); @@ -638,6 +637,39 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { return code; } +int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { + int code = 0; + char* err = NULL; + + rocksdb_snapshot_t* snapshot = NULL; + rocksdb_readoptions_t* readopts = NULL; + rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); + if (pIter == NULL) { + return -1; + } + + rocksdb_iter_seek(pIter, start, strlen(start)); + while (rocksdb_iter_valid(pIter)) { + const char* key = rocksdb_iter_key(pIter, NULL); + if (end != NULL && strcmp(key, end) > 0) { + break; + } + if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { + int64_t checkPoint = 0; + if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { + taosArrayPush(result, &checkPoint); + } + } else { + break; + } + rocksdb_iter_next(pIter); + } + rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); + rocksdb_readoptions_destroy(readopts); + rocksdb_iter_destroy(pIter); + return code; +} + int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 08c8c16079..ddd8e148ff 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -371,7 +371,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, char keyBuf[128] = {0}; char valBuf[64] = {0}; int32_t len = 0; - sprintf(keyBuf, "%s:%" PRId64 "", taskKey, INT64_MIN); + memcpy(keyBuf, taskKey, strlen(taskKey)); sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, strlen(valBuf)); @@ -382,7 +382,16 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, streamStateDestroyBatch(batch); return code; } - +int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { + const char* taskKey = "streamFileState"; + char keyBuf[128] = {0}; + sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId); + return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); +} +int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { + const char* taskKey = "streamFileState"; + return streamDefaultIter_rocksdb(pFileState->pFileStore, taskKey, NULL, list); +} int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; const char* taskKey = "streamFileState"; @@ -391,7 +400,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { char buf[128] = {0}; void* val = NULL; int32_t len = 0; - sprintf(buf, "%s:%" PRId64 "", taskKey, INT64_MIN); + memcpy(buf, taskKey, strlen(taskKey)); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); if (code != 0) { return TSDB_CODE_FAILED; @@ -412,7 +421,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { sscanf(val, "%" PRId64 "", &ts); taosMemoryFree(val); if (ts < pFileState->flushMark) { - // forceRemoveCheckPoint(pFileState->pFileStore, i); + forceRemoveCheckpoint(pFileState, i); break; } else { }