support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-08 07:21:47 +00:00
parent 6b73fc9c06
commit ac11537fc8
1 changed files with 76 additions and 21 deletions

View File

@ -21,14 +21,16 @@
#include "tref.h"
typedef struct {
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
SHashObj* pSSTable;
char* path;
char* buf;
int32_t len;
} SBackendManager;
typedef struct SCompactFilteFactory {
@ -144,37 +146,90 @@ SBackendManager* backendManagerCreate(char* path) {
p->preCkptId = 0;
p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path);
p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
return p;
}
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
int32_t code = 0;
size_t len = 0;
void* pIter = taosHashIterate(p2, NULL);
while (pIter) {
char* name = taosHashGetKey(pIter, &len);
if (!taosHashGet(p1, name, len)) {
char* p = taosStrdup(name);
taosArrayPush(diff, &p);
}
pIter = taosHashIterate(p2, pIter);
}
return code;
}
int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
int32_t code = 0;
code = compareHashTableImpl(p1, p2, add);
code = compareHashTableImpl(p2, p1, del);
return code;
}
int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) {
const char* pCurrent = "CURRENT";
int32_t currLen = strlen(pCurrent);
const char* pManifest = "MANIFEST-";
int32_t maniLen = strlen(pManifest);
const char* pSST = ".sst";
int32_t sstLen = strlen(pSST);
memset(bm->buf, 0, bm->len);
sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId);
SHashObj* pTable = bm->init == 0
? bm->pSSTable
: taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
TdDirPtr pDir = taosOpenDir(bm->buf);
TdDirEntryPtr de = NULL;
int8_t dummy = 0;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
taosMemoryFreeClear(bm->pCurrent);
bm->pCurrent = taosStrdup(name);
taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
taosMemoryFreeClear(bm->pManifest);
bm->pManifest = taosStrdup(name);
taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
char* p = taosStrdup(name);
// sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name);
// sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name);
// if (!taosDirEntryIsDir(de)) {
// code = taosCopyFile(absSrcPath, absDstPath);
// if (code == -1) {
// goto _err;
// }
// }
// memset(absSrcPath, 0, sLen + 64);
// memset(absDstPath, 0, dLen + 64);
taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy));
continue;
}
}
if (bm->init == 0) {
bm->preCkptId = chkpId;
bm->curChkpId = chkpId;
bm->init = 1;
} else {
SArray* add = taosArrayInit(64, sizeof(void*));
SArray* del = taosArrayInit(64, sizeof(void*));
int32_t code = compareHashTable(bm->pSSTable, pTable, add, del);
bm->curChkpId = chkpId;
taosHashCleanup(pTable);
}
return 0;
}