diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 122f803ab6..c9cb5ab64c 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -66,7 +66,7 @@ typedef struct { TdThreadMutex mutex; char* idstr; char* path; - int64_t refId; + int64_t refId; void* pTask; int64_t streamId; @@ -79,6 +79,53 @@ typedef struct { } STaskDbWrapper; +typedef struct SDbChkp { + int8_t init; + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + char* path; + + char* buf; + int32_t len; + + // ping-pong buf + SHashObj* pSstTbl[2]; + int8_t idx; + + SArray* pAdd; + SArray* pDel; + int8_t update; + + TdThreadRwlock rwLock; +} SDbChkp; +typedef struct { + int8_t init; + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + char* path; + + char* buf; + int32_t len; + + // ping-pong buf + SHashObj* pSstTbl[2]; + int8_t idx; + + SArray* pAdd; + SArray* pDel; + int8_t update; + + SHashObj* pDbChkpTbl; + + TdThreadRwlock rwLock; +} SBkdMgt; + void* streamBackendInit(const char* path, int64_t chkpId); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); @@ -194,4 +241,15 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); int32_t taskDbBuildSnap(void* arg, SArray* pSnap); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); + +STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId); +void taskDbDestroy(void* pDb); + +int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); + +SBkdMgt* bkdMgtCreate(char* path); +int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path); +int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list); +int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); +void bkdMgtDestroy(SBkdMgt* bm); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 074d23cdbf..a06ec8c9c6 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -20,53 +20,6 @@ #include "tcommon.h" #include "tref.h" -typedef struct SDbChkp { - int8_t init; - char* pCurrent; - char* pManifest; - SArray* pSST; - int64_t preCkptId; - int64_t curChkpId; - char* path; - - char* buf; - int32_t len; - - // ping-pong buf - SHashObj* pSstTbl[2]; - int8_t idx; - - SArray* pAdd; - SArray* pDel; - int8_t update; - - TdThreadRwlock rwLock; -} SDbChkp; -typedef struct { - int8_t init; - char* pCurrent; - char* pManifest; - SArray* pSST; - int64_t preCkptId; - int64_t curChkpId; - char* path; - - char* buf; - int32_t len; - - // ping-pong buf - SHashObj* pSstTbl[2]; - int8_t idx; - - SArray* pAdd; - SArray* pDel; - int8_t update; - - SHashObj* pDbChkpTbl; - - TdThreadRwlock rwLock; -} SBkdMgt; - typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; @@ -223,8 +176,6 @@ SCfInit ginitDict[] = { valueEncode, valueDecode, compactFilteFactoryCreateFilter, destroyCompactFilteFactory, compactFilteFactoryName}, }; -const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; - int32_t getCfIdx(const char* cfName) { int idx = -1; size_t len = strlen(cfName); @@ -868,7 +819,6 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId); taskDbRemoveRef(pTaskDb); - pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); SStreamTask* pTask = pTaskDb->pTask; SStreamTaskSnap snap = {.streamId = pTask->id.streamId, @@ -876,6 +826,7 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { .chkpId = pTaskDb->chkpId, .dbPrefixPath = taosStrdup(pTaskDb->path)}; taosArrayPush(pSnap, &snap); + pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); } return code; }