From 6b73fc9c06dc74da04d48fd5ddc69894a770cb8c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 13:56:53 +0000 Subject: [PATCH] support reopen stream state --- source/libs/stream/src/streamBackendRocksdb.c | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 54b0e8498e..f926efb94d 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -20,6 +20,17 @@ #include "tcommon.h" #include "tref.h" +typedef struct { + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + char* path; + char* buf; + int32_t len; +} SBackendManager; + typedef struct SCompactFilteFactory { void* status; } SCompactFilteFactory; @@ -127,6 +138,46 @@ void destroyFunc(void* arg); int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); +SBackendManager* backendManagerCreate(char* path) { + SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); + p->curChkpId = 0; + p->preCkptId = 0; + p->pSST = taosArrayInit(64, sizeof(void*)); + p->path = taosStrdup(path); + + p->len = strlen(path) + 128; + p->buf = taosMemoryCalloc(1, p->len); + return p; +} + +int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { + memset(bm->buf, 0, bm->len); + sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId); + + TdDirPtr pDir = taosOpenDir(bm->buf); + TdDirEntryPtr de = NULL; + + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + + + // sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name); + // sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name); + // if (!taosDirEntryIsDir(de)) { + // code = taosCopyFile(absSrcPath, absDstPath); + // if (code == -1) { + // goto _err; + // } + // } + + // memset(absSrcPath, 0, sLen + 64); + // memset(absDstPath, 0, dLen + 64); + } + + return 0; +} + SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, destroyFunc, encodeValueFunc, decodeValueFunc}, @@ -219,6 +270,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } + void* streamBackendInit(const char* streamPath, int64_t chkpId) { char* backendPath = NULL; int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);