refactor backend

This commit is contained in:
yihaoDeng 2023-11-07 16:01:57 +08:00
parent 1ee82f5e2c
commit 4f53ffd76f
1 changed files with 306 additions and 324 deletions

View File

@ -127,7 +127,7 @@ typedef struct {
const char* key;
int32_t len;
int idx;
__db_key_cmp_fn_t cmpFunc;
__db_key_cmp_fn_t cmpKey;
__db_key_encode_fn_t enFunc;
__db_key_decode_fn_t deFunc;
__db_key_tostr_fn_t toStrFunc;
@ -180,9 +180,6 @@ int parKeyEncode(void* k, char* buf);
int parKeyDecode(void* k, char* buf);
int parKeyToString(void* k, char* buf);
// int stremaValueEncode(void* k, char* buf);
// int streamValueDecode(void* k, char* buf);
int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest);
int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest);
int32_t valueToString(void* k, char* buf);
@ -240,306 +237,6 @@ int32_t getCfIdx(const char* cfName) {
return idx;
}
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
int32_t code = 0;
size_t len = 0;
void* pIter = taosHashIterate(p2, NULL);
while (pIter) {
char* name = taosHashGetKey(pIter, &len);
if (!taosHashGet(p1, name, len)) {
char* p = taosStrdup(name);
taosArrayPush(diff, &p);
}
pIter = taosHashIterate(p2, pIter);
}
return code;
}
int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
int32_t code = 0;
code = compareHashTableImpl(p1, p2, add);
code = compareHashTableImpl(p2, p1, del);
return code;
}
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
taosThreadRwlockWrlock(&p->rwLock);
p->preCkptId = p->curChkpId;
p->curChkpId = chkpId;
const char* pCurrent = "CURRENT";
int32_t currLen = strlen(pCurrent);
const char* pManifest = "MANIFEST-";
int32_t maniLen = strlen(pManifest);
const char* pSST = ".sst";
int32_t sstLen = strlen(pSST);
memset(p->buf, 0, p->len);
sprintf(p->buf, "%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, chkpId);
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
TdDirPtr pDir = taosOpenDir(p->buf);
TdDirEntryPtr de = NULL;
int8_t dummy = 0;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
taosMemoryFreeClear(p->pCurrent);
p->pCurrent = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
taosMemoryFreeClear(p->pManifest);
p->pManifest = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
// char* p = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
}
if (p->init == 0) {
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
while (pIter) {
size_t len;
char* name = taosHashGetKey(pIter, &len);
if (name != NULL && len != 0) {
taosArrayPush(p->pAdd, &name);
}
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
}
if (taosArrayGetSize(p->pAdd) > 0) p->update = 1;
p->init = 1;
p->preCkptId = -1;
p->curChkpId = chkpId;
} else {
int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel);
if (code != 0) {
// dead code
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
taosHashClear(p->pSstTbl[1 - p->idx]);
p->update = 0;
taosCloseDir(&pDir);
return code;
}
if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) {
p->update = 0;
}
p->preCkptId = p->curChkpId;
p->curChkpId = chkpId;
}
taosHashClear(p->pSstTbl[p->idx]);
p->idx = 1 - p->idx;
taosCloseDir(&pDir);
taosThreadRwlockUnlock(&p->rwLock);
return 0;
}
SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
p->curChkpId = initChkpId;
p->preCkptId = -1;
p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path);
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pAdd = taosArrayInit(64, sizeof(void*));
p->pDel = taosArrayInit(64, sizeof(void*));
p->update = 0;
taosThreadRwlockInit(&p->rwLock, NULL);
SArray* list = NULL;
int32_t code = dbChkpGetDelta(p, initChkpId, list);
return p;
}
void dbChkpDestroy(SDbChkp* pChkp) {
taosMemoryFree(pChkp->buf);
taosMemoryFree(pChkp->path);
taosArrayDestroyP(pChkp->pSST, taosMemoryFree);
taosArrayDestroyP(pChkp->pAdd, taosMemoryFree);
taosArrayDestroyP(pChkp->pDel, taosMemoryFree);
taosHashCleanup(pChkp->pSstTbl[0]);
taosHashCleanup(pChkp->pSstTbl[1]);
taosMemoryFree(pChkp->pCurrent);
taosMemoryFree(pChkp->pManifest);
}
int32_t dbChkpInit(SDbChkp* p) {
if (p == NULL) return 0;
return 0;
}
int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
taosThreadRwlockRdlock(&p->rwLock);
int32_t code = 0;
int32_t len = p->len + 128;
char* srcBuf = taosMemoryCalloc(1, len);
char* dstBuf = taosMemoryCalloc(1, len);
char* srcDir = taosMemoryCalloc(1, len);
char* dstDir = taosMemoryCalloc(1, len);
sprintf(srcDir, "%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoint", p->curChkpId);
sprintf(dstDir, "%s%s%s", p->path, TD_DIRSEP, dname);
if (!taosDirExist(srcDir)) {
stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
code = -1;
goto _ERROR;
}
code = taosMkDir(dstDir);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr());
goto _ERROR;
}
// clear current file
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent);
taosRemoveFile(dstBuf);
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest);
taosRemoveFile(dstBuf);
// add file to $name dir
for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(p->pAdd, i);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosCopyFile(srcBuf, dstBuf);
}
// del file in $name
for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(p->pDel, i);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosRemoveFile(dstBuf);
}
// copy current file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent);
taosCopyFile(srcBuf, dstBuf);
// copy manifest file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest);
taosCopyFile(srcBuf, dstBuf);
// clear delta data buf
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
_ERROR:
taosThreadRwlockUnlock(&p->rwLock);
taosMemoryFree(srcBuf);
taosMemoryFree(dstBuf);
taosMemoryFree(srcDir);
taosMemoryFree(dstDir);
return code;
}
SBkdMgt* bkdMgtCreate(char* path) {
SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
taosThreadRwlockInit(&p->rwLock, NULL);
return p;
}
void bkdMgtDestroy(SBkdMgt* bm) {
if (bm == NULL) return;
void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL);
while (pIter) {
SDbChkp* pChkp = *(SDbChkp**)(pIter);
dbChkpDestroy(pChkp);
pIter = taosHashIterate(bm->pDbChkpTbl, pIter);
}
taosThreadRwlockDestroy(&bm->rwLock);
taosMemoryFree(bm);
}
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list) {
int32_t code = 0;
taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpGetDelta(pChkp, chkpId, list);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
int32_t code = -1;
taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
if (pp == NULL) {
SDbChkp* p = dbChkpCreate(path, 0);
if (p != NULL) {
taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
code = 0;
}
} else {
stError("task chkp already exists");
}
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) {
int32_t code = 0;
taosThreadRwlockRdlock(&bm->rwLock);
SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpDumpTo(p, dname);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
bool isValidCheckpoint(const char* dir) { return true; }
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
@ -1582,22 +1279,6 @@ int parKeyToString(void* k, char* buf) {
n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key);
return n;
}
// int stremaValueEncode(void* k, char* buf) {
// int len = 0;
// SStreamValue* key = k;
// len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp);
// len += taosEncodeFixedI32((void**)&buf, key->len);
// len += taosEncodeBinary((void**)&buf, key->data, key->len);
// return len;
// }
// int streamValueDecode(void* k, char* buf) {
// SStreamValue* key = k;
// char* p = buf;
// p = taosDecodeFixedI64(p, &key->unixTimestamp);
// p = taosDecodeFixedI32(p, &key->len);
// p = taosDecodeBinary(p, (void**)&key->data, key->len);
// return p - buf;
// }
int32_t valueToString(void* k, char* buf) {
SStreamValue* key = k;
int n = 0;
@ -1874,7 +1555,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
SCfInit* cfPara = &ginitDict[i];
rocksdb_comparator_t* compare =
rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpFunc, cfPara->cmpName);
rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
rocksdb_compactionfilterfactory_t* filterFactory =
@ -2181,7 +1862,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
SCfInit* cfPara = &ginitDict[idx];
rocksdb_comparator_t* compare =
rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpFunc, cfPara->cmpName);
rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare);
pCompare[i] = compare;
}
@ -2265,7 +1946,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
SCfInit* cfPara = &ginitDict[i];
rocksdb_comparator_t* compare =
rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpFunc, cfPara->cmpName);
rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
inst->pCompares[i] = compare;
@ -2345,7 +2026,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
for (int i = 0; i < cfLen; i++) {
SCfInit* cf = &ginitDict[i];
rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->destroyCmp, cf->cmpFunc, cf->cmpName);
rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->destroyCmp, cf->cmpKey, cf->cmpName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
pCompare[i] = compare;
}
@ -3672,4 +3353,305 @@ _err:
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
return code >= 0 ? 0 : -1;
}
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
int32_t code = 0;
size_t len = 0;
void* pIter = taosHashIterate(p2, NULL);
while (pIter) {
char* name = taosHashGetKey(pIter, &len);
if (!taosHashGet(p1, name, len)) {
char* p = taosStrdup(name);
taosArrayPush(diff, &p);
}
pIter = taosHashIterate(p2, pIter);
}
return code;
}
int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) {
int32_t code = 0;
code = compareHashTableImpl(p1, p2, add);
code = compareHashTableImpl(p2, p1, del);
return code;
}
int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
taosThreadRwlockWrlock(&p->rwLock);
p->preCkptId = p->curChkpId;
p->curChkpId = chkpId;
const char* pCurrent = "CURRENT";
int32_t currLen = strlen(pCurrent);
const char* pManifest = "MANIFEST-";
int32_t maniLen = strlen(pManifest);
const char* pSST = ".sst";
int32_t sstLen = strlen(pSST);
memset(p->buf, 0, p->len);
sprintf(p->buf, "%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, chkpId);
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
TdDirPtr pDir = taosOpenDir(p->buf);
TdDirEntryPtr de = NULL;
int8_t dummy = 0;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
taosMemoryFreeClear(p->pCurrent);
p->pCurrent = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
taosMemoryFreeClear(p->pManifest);
p->pManifest = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) {
// char* p = taosStrdup(name);
taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy));
continue;
}
}
if (p->init == 0) {
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
while (pIter) {
size_t len;
char* name = taosHashGetKey(pIter, &len);
if (name != NULL && len != 0) {
taosArrayPush(p->pAdd, &name);
}
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
}
if (taosArrayGetSize(p->pAdd) > 0) p->update = 1;
p->init = 1;
p->preCkptId = -1;
p->curChkpId = chkpId;
} else {
int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel);
if (code != 0) {
// dead code
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
taosHashClear(p->pSstTbl[1 - p->idx]);
p->update = 0;
taosCloseDir(&pDir);
return code;
}
if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) {
p->update = 0;
}
p->preCkptId = p->curChkpId;
p->curChkpId = chkpId;
}
taosHashClear(p->pSstTbl[p->idx]);
p->idx = 1 - p->idx;
taosCloseDir(&pDir);
taosThreadRwlockUnlock(&p->rwLock);
return 0;
}
SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) {
SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp));
p->curChkpId = initChkpId;
p->preCkptId = -1;
p->pSST = taosArrayInit(64, sizeof(void*));
p->path = taosStrdup(path);
p->len = strlen(path) + 128;
p->buf = taosMemoryCalloc(1, p->len);
p->idx = 0;
p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
p->pAdd = taosArrayInit(64, sizeof(void*));
p->pDel = taosArrayInit(64, sizeof(void*));
p->update = 0;
taosThreadRwlockInit(&p->rwLock, NULL);
SArray* list = NULL;
int32_t code = dbChkpGetDelta(p, initChkpId, list);
return p;
}
void dbChkpDestroy(SDbChkp* pChkp) {
taosMemoryFree(pChkp->buf);
taosMemoryFree(pChkp->path);
taosArrayDestroyP(pChkp->pSST, taosMemoryFree);
taosArrayDestroyP(pChkp->pAdd, taosMemoryFree);
taosArrayDestroyP(pChkp->pDel, taosMemoryFree);
taosHashCleanup(pChkp->pSstTbl[0]);
taosHashCleanup(pChkp->pSstTbl[1]);
taosMemoryFree(pChkp->pCurrent);
taosMemoryFree(pChkp->pManifest);
}
int32_t dbChkpInit(SDbChkp* p) {
if (p == NULL) return 0;
return 0;
}
int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
taosThreadRwlockRdlock(&p->rwLock);
int32_t code = 0;
int32_t len = p->len + 128;
char* srcBuf = taosMemoryCalloc(1, len);
char* dstBuf = taosMemoryCalloc(1, len);
char* srcDir = taosMemoryCalloc(1, len);
char* dstDir = taosMemoryCalloc(1, len);
sprintf(srcDir, "%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoint", p->curChkpId);
sprintf(dstDir, "%s%s%s", p->path, TD_DIRSEP, dname);
if (!taosDirExist(srcDir)) {
stError("failed to dump srcDir %s, reason: not exist such dir", srcDir);
code = -1;
goto _ERROR;
}
code = taosMkDir(dstDir);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr());
goto _ERROR;
}
// clear current file
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent);
taosRemoveFile(dstBuf);
memset(dstBuf, 0, len);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest);
taosRemoveFile(dstBuf);
// add file to $name dir
for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(p->pAdd, i);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosCopyFile(srcBuf, dstBuf);
}
// del file in $name
for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
memset(dstBuf, 0, len);
memset(srcBuf, 0, len);
char* filename = taosArrayGetP(p->pDel, i);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosRemoveFile(dstBuf);
}
// copy current file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent);
taosCopyFile(srcBuf, dstBuf);
// copy manifest file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest);
taosCopyFile(srcBuf, dstBuf);
// clear delta data buf
taosArrayClearP(p->pAdd, taosMemoryFree);
taosArrayClearP(p->pDel, taosMemoryFree);
_ERROR:
taosThreadRwlockUnlock(&p->rwLock);
taosMemoryFree(srcBuf);
taosMemoryFree(dstBuf);
taosMemoryFree(srcDir);
taosMemoryFree(dstDir);
return code;
}
SBkdMgt* bkdMgtCreate(char* path) {
SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt));
p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
taosThreadRwlockInit(&p->rwLock, NULL);
return p;
}
void bkdMgtDestroy(SBkdMgt* bm) {
if (bm == NULL) return;
void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL);
while (pIter) {
SDbChkp* pChkp = *(SDbChkp**)(pIter);
dbChkpDestroy(pChkp);
pIter = taosHashIterate(bm->pDbChkpTbl, pIter);
}
taosThreadRwlockDestroy(&bm->rwLock);
taosMemoryFree(bm);
}
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list) {
int32_t code = 0;
taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpGetDelta(pChkp, chkpId, list);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) {
int32_t code = -1;
taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task));
if (pp == NULL) {
SDbChkp* p = dbChkpCreate(path, 0);
if (p != NULL) {
taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*));
code = 0;
}
} else {
stError("task chkp already exists");
}
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) {
int32_t code = 0;
taosThreadRwlockRdlock(&bm->rwLock);
SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpDumpTo(p, dname);
taosThreadRwlockUnlock(&bm->rwLock);
return code;
}