support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-07 13:56:53 +00:00
parent 80e78e054d
commit 6b73fc9c06
1 changed files with 52 additions and 0 deletions

View File

@ -20,6 +20,17 @@
#include "tcommon.h"
#include "tref.h"
typedef struct {
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
} SBackendManager;
typedef struct SCompactFilteFactory {
void* status;
} SCompactFilteFactory;
@ -127,6 +138,46 @@ void destroyFunc(void* arg);
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest);
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
SBackendManager* backendManagerCreate(char* path) {
SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager));
p->curChkpId = 0;
p->preCkptId = 0;
p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path);
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
return p;
}
int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) {
memset(bm->buf, 0, bm->len);
sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId);
TdDirPtr pDir = taosOpenDir(bm->buf);
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
// sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name);
// sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name);
// if (!taosDirEntryIsDir(de)) {
// code = taosCopyFile(absSrcPath, absDstPath);
// if (code == -1) {
// goto _err;
// }
// }
// memset(absSrcPath, 0, sLen + 64);
// memset(absDstPath, 0, dLen + 64);
}
return 0;
}
SCfInit ginitDict[] = {
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
destroyFunc, encodeValueFunc, decodeValueFunc},
@ -219,6 +270,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
return 0;
}
void* streamBackendInit(const char* streamPath, int64_t chkpId) {
char* backendPath = NULL;
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);