add checkpoint delta

This commit is contained in:
yihaoDeng 2023-11-02 20:39:55 +08:00
parent 7e7a39f077
commit 0a73832fd2
1 changed files with 188 additions and 80 deletions

View File

@ -20,6 +20,28 @@
#include "tcommon.h"
#include "tref.h"
typedef struct SDbChkp {
int8_t init;
char* pCurrent;
char* pManifest;
SArray* pSST;
int64_t preCkptId;
int64_t curChkpId;
char* path;
char* buf;
int32_t len;
// ping-pong buf
SHashObj* pSstTbl[2];
int8_t idx;
SArray* pAdd;
SArray* pDel;
int8_t update;
TdThreadRwlock rwLock;
} SDbChkp;
typedef struct {
int8_t init;
char* pCurrent;
@ -39,6 +61,10 @@ typedef struct {
SArray* pAdd;
SArray* pDel;
int8_t update;
SHashObj *pDbChkpTbl;
TdThreadRwlock rwLock;
} SBackendManager;
typedef struct SCompactFilteFactory {
@ -145,42 +171,23 @@ void destroyFunc(void* arg);
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest);
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
SBackendManager* bkdMgtCreate(char* path) {
SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager));
p->curChkpId = 0;
p->preCkptId = 0;
p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path);
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
void dbChkpDestroy(SDbChkp* pChkp) {
taosMemoryFree(pChkp->buf);
taosMemoryFree(pChkp->path);
p->pAdd = taosArrayInit(64, sizeof(void*));
p->pDel = taosArrayInit(64, sizeof(void*));
p->update = 0;
return p;
taosArrayDestroyP(pChkp->pSST, taosMemoryFree);
taosArrayDestroyP(pChkp->pAdd, taosMemoryFree);
taosArrayDestroyP(pChkp->pDel, taosMemoryFree);
taosHashCleanup(pChkp->pSstTbl[0]);
taosHashCleanup(pChkp->pSstTbl[1]);
taosMemoryFree(pChkp->pCurrent);
taosMemoryFree(pChkp->pManifest);
}
void bkdMgtDestroy(SBackendManager* bm) {
if (bm == NULL) return;
taosMemoryFree(bm->buf);
taosMemoryFree(bm->path);
taosArrayDestroyP(bm->pSST, taosMemoryFree);
taosArrayDestroyP(bm->pAdd, taosMemoryFree);
taosArrayDestroyP(bm->pDel, taosMemoryFree);
taosHashCleanup(bm->pSstTbl[0]);
taosHashCleanup(bm->pSstTbl[1]);
taosMemoryFree(bm->pCurrent);
taosMemoryFree(bm->pManifest);
taosMemoryFree(bm);
}
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
int32_t code = 0;
@ -204,7 +211,12 @@ int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
return code;
}
int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) {
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray *list) {
taosThreadRwlockWrlock(&p->rwLock);
p->preCkptId = p->curChkpId;
p->curChkpId = chkpId;
const char* pCurrent = "CURRENT";
int32_t currLen = strlen(pCurrent);
@ -214,81 +226,112 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) {
const char* pSST = ".sst";
int32_t sstLen = strlen(pSST);
memset(bm->buf, 0, bm->len);
sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId);
memset(p->buf, 0, p->len);
sprintf(p->buf, "%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, chkpId);
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
TdDirPtr pDir = taosOpenDir(bm->buf);
TdDirPtr pDir = taosOpenDir(p->buf);
TdDirEntryPtr de = NULL;
int8_t dummy = 0;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
taosMemoryFreeClear(bm->pCurrent);
bm->pCurrent = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
taosMemoryFreeClear(p->pCurrent);
p->pCurrent = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
taosMemoryFreeClear(bm->pManifest);
bm->pManifest = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
taosMemoryFreeClear(p->pManifest);
p->pManifest = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
// char* p = taosStrdup(name);
taosHashPut(bm->pSstTbl[1 - bm->idx], name, strlen(name), &dummy, sizeof(dummy));
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
}
if (bm->init == 0) {
bm->preCkptId = -1;
bm->curChkpId = chkpId;
bm->init = 1;
if (p->init == 0) {
p->preCkptId = -1;
p->curChkpId = chkpId;
p->init = 1;
void* pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], NULL);
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
while (pIter) {
size_t len;
char* name = taosHashGetKey(pIter, &len);
if (name != NULL && len != 0) {
taosArrayPush(bm->pAdd, &name);
taosArrayPush(p->pAdd, &name);
}
pIter = taosHashIterate(bm->pSstTbl[1 - bm->idx], pIter);
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
}
if (taosArrayGetSize(bm->pAdd) > 0) bm->update = 1;
if (taosArrayGetSize(p->pAdd) > 0) p->update = 1;
} else {
int32_t code = compareHashTable(bm->pSstTbl[bm->idx], bm->pSstTbl[1 - bm->idx], bm->pAdd, bm->pDel);
int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel);
if (code != 0) {
// dead code
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
taosHashClear(bm->pSstTbl[1 - bm->idx]);
bm->update = 0;
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
taosHashClear(p->pSstTbl[1 - p->idx]);
p->update = 0;
taosCloseDir(&pDir);
return code;
}
bm->preCkptId = bm->curChkpId;
bm->curChkpId = chkpId;
if (taosArrayGetSize(bm->pAdd) == 0 && taosArrayGetSize(bm->pDel) == 0) {
bm->update = 0;
p->preCkptId = p->curChkpId;
p->curChkpId = chkpId;
if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) {
p->update = 0;
}
}
taosHashClear(bm->pSstTbl[bm->idx]);
bm->idx = 1 - bm->idx;
taosHashClear(p->pSstTbl[p->idx]);
p->idx = 1 - p->idx;
taosCloseDir(&pDir);
taosThreadRwlockUnlock(&p->rwLock);
return 0;
}
SDbChkp* dbChktCreate(char* path, int64_t initChkpId) {
SDbChkp *p = taosMemoryCalloc(1, sizeof(SDbChkp));
p->curChkpId = initChkpId;
p->preCkptId = -1;
p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path);
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pAdd = taosArrayInit(64, sizeof(void*));
p->pDel = taosArrayInit(64, sizeof(void*));
p->update = 0;
taosThreadRwlockInit(&p->rwLock, NULL);
SArray *list = NULL;
int32_t code = dbChkpGetDelta(p, initChkpId, list);
return p;
}
int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) {
int32_t dbChkpInit(SDbChkp* p) {
if (p == NULL) return 0;
return 0;
}
int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
taosThreadRwlockRdlock(&p->rwLock);
int32_t code = 0;
int32_t len = bm->len + 128;
int32_t len = p->len + 128;
char* srcBuf = taosMemoryCalloc(1, len);
char* dstBuf = taosMemoryCalloc(1, len);
@ -296,8 +339,8 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) {
char* srcDir = taosMemoryCalloc(1, len);
char* dstDir = taosMemoryCalloc(1, len);
sprintf(srcDir, "%s%s%s%" PRId64 "", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId);
sprintf(dstDir, "%s%s%s", bm->path, TD_DIRSEP, dname);
sprintf(srcDir, "%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoint", p->curChkpId);
sprintf(dstDir, "%s%s%s", p->path, TD_DIRSEP, dname);
if (!taosDirExist(srcDir)) {
stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
@ -314,30 +357,30 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) {
// clear current file
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent);
taosRemoveFile(dstBuf);
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest);
taosRemoveFile(dstBuf);
// add file to $name dir
for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) {
for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(bm->pAdd, i);
char* filename = taosArrayGetP(p->pAdd, i);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosCopyFile(srcBuf, dstBuf);
}
// del file in $name
for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) {
for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(bm->pDel, i);
char* filename = taosArrayGetP(p->pDel, i);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosRemoveFile(dstBuf);
}
@ -345,27 +388,92 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* dname) {
// copy current file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pCurrent);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pCurrent);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent);
taosCopyFile(srcBuf, dstBuf);
// copy manifest file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, bm->pManifest);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, bm->pManifest);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest);
taosCopyFile(srcBuf, dstBuf);
// clear delta data buf
taosArrayClearP(bm->pAdd, taosMemoryFree);
taosArrayClearP(bm->pDel, taosMemoryFree);
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
_ERROR:
taosThreadRwlockUnlock(&p->rwLock);
taosMemoryFree(srcBuf);
taosMemoryFree(dstBuf);
taosMemoryFree(srcDir);
taosMemoryFree(dstDir);
return code;
}
SBackendManager* bkdMgtCreate(char* path) {
SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager));
p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
taosThreadRwlockInit(&p->rwLock, NULL);
return p;
}
void bkdMgtDestroy(SBackendManager* bm) {
if (bm == NULL) return;
void *pIter = taosHashIterate(bm->pDbChkpTbl, NULL);
while (pIter) {
SDbChkp *pChkp = *(SDbChkp **)(pIter);
dbChkpDestroy(pChkp);
pIter = taosHashIterate(bm->pDbChkpTbl, pIter);
}
taosThreadRwlockDestroy(&bm->rwLock);
taosMemoryFree(bm);
}
int32_t bkdMgtGetDelta(SBackendManager* bm, char *taskId, int64_t chkpId, SArray* list) {
int32_t code = 0;
taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp *pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpGetDelta(pChkp, chkpId, list);
taosThreadRwlockUnlock(&bm->rwLock);
return code ;
}
int32_t bkdMgtAddChkp(SBackendManager *bm, char *task, char *path) {
int32_t code = -1;
taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp **pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
if (pp == NULL) {
SDbChkp *p = dbChktCreate(path, 0);
if (p != NULL) {
taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void *));
code = 0;
}
} else {
stError("task chkp already exists");
}
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
int32_t bkdMgtDumpTo(SBackendManager* bm, char *taskId, char* dname) {
int32_t code = 0;
taosThreadRwlockRdlock(&bm->rwLock);
SDbChkp *p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpDumpTo(p, dname);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
SCfInit ginitDict[] = {