From 4f53ffd76f59edae62c211e25aa9e1007e1d4833 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 7 Nov 2023 16:01:57 +0800 Subject: [PATCH] refactor backend --- source/libs/stream/src/streamBackendRocksdb.c | 630 +++++++++--------- 1 file changed, 306 insertions(+), 324 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 300251885d..074d23cdbf 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -127,7 +127,7 @@ typedef struct { const char* key; int32_t len; int idx; - __db_key_cmp_fn_t cmpFunc; + __db_key_cmp_fn_t cmpKey; __db_key_encode_fn_t enFunc; __db_key_decode_fn_t deFunc; __db_key_tostr_fn_t toStrFunc; @@ -180,9 +180,6 @@ int parKeyEncode(void* k, char* buf); int parKeyDecode(void* k, char* buf); int parKeyToString(void* k, char* buf); -// int stremaValueEncode(void* k, char* buf); -// int streamValueDecode(void* k, char* buf); - int32_t valueEncode(void* value, int32_t vlen, int64_t ttl, char** dest); int32_t valueDecode(void* value, int32_t vlen, int64_t* ttl, char** dest); int32_t valueToString(void* k, char* buf); @@ -240,306 +237,6 @@ int32_t getCfIdx(const char* cfName) { return idx; } -int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { - int32_t code = 0; - size_t len = 0; - void* pIter = taosHashIterate(p2, NULL); - while (pIter) { - char* name = taosHashGetKey(pIter, &len); - if (!taosHashGet(p1, name, len)) { - char* p = taosStrdup(name); - taosArrayPush(diff, &p); - } - pIter = taosHashIterate(p2, pIter); - } - return code; -} -int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { - int32_t code = 0; - - code = compareHashTableImpl(p1, p2, add); - code = compareHashTableImpl(p2, p1, del); - - return code; -} - -int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { - taosThreadRwlockWrlock(&p->rwLock); - - p->preCkptId = p->curChkpId; - p->curChkpId = chkpId; - const char* pCurrent = "CURRENT"; - int32_t currLen = strlen(pCurrent); - - const char* pManifest = "MANIFEST-"; - int32_t maniLen = strlen(pManifest); - - const char* pSST = ".sst"; - int32_t sstLen = strlen(pSST); - - memset(p->buf, 0, p->len); - sprintf(p->buf, "%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, chkpId); - - taosArrayClearP(p->pAdd, taosMemoryFree); - taosArrayClearP(p->pDel, taosMemoryFree); - - TdDirPtr pDir = taosOpenDir(p->buf); - TdDirEntryPtr de = NULL; - int8_t dummy = 0; - while ((de = taosReadDir(pDir)) != NULL) { - char* name = taosGetDirEntryName(de); - if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; - if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { - taosMemoryFreeClear(p->pCurrent); - p->pCurrent = taosStrdup(name); - taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); - continue; - } - - if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { - taosMemoryFreeClear(p->pManifest); - p->pManifest = taosStrdup(name); - taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); - continue; - } - if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { - // char* p = taosStrdup(name); - taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); - continue; - } - } - if (p->init == 0) { - void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); - while (pIter) { - size_t len; - char* name = taosHashGetKey(pIter, &len); - if (name != NULL && len != 0) { - taosArrayPush(p->pAdd, &name); - } - pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); - } - if (taosArrayGetSize(p->pAdd) > 0) p->update = 1; - - p->init = 1; - p->preCkptId = -1; - p->curChkpId = chkpId; - } else { - int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel); - if (code != 0) { - // dead code - taosArrayClearP(p->pAdd, taosMemoryFree); - taosArrayClearP(p->pDel, taosMemoryFree); - taosHashClear(p->pSstTbl[1 - p->idx]); - p->update = 0; - taosCloseDir(&pDir); - return code; - } - - if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) { - p->update = 0; - } - - p->preCkptId = p->curChkpId; - p->curChkpId = chkpId; - } - - taosHashClear(p->pSstTbl[p->idx]); - p->idx = 1 - p->idx; - - taosCloseDir(&pDir); - taosThreadRwlockUnlock(&p->rwLock); - - return 0; -} -SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { - SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp)); - p->curChkpId = initChkpId; - p->preCkptId = -1; - p->pSST = taosArrayInit(64, sizeof(void*)); - p->path = taosStrdup(path); - p->len = strlen(path) + 128; - p->buf = taosMemoryCalloc(1, p->len); - - p->idx = 0; - p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - - p->pAdd = taosArrayInit(64, sizeof(void*)); - p->pDel = taosArrayInit(64, sizeof(void*)); - p->update = 0; - taosThreadRwlockInit(&p->rwLock, NULL); - - SArray* list = NULL; - int32_t code = dbChkpGetDelta(p, initChkpId, list); - - return p; -} - -void dbChkpDestroy(SDbChkp* pChkp) { - taosMemoryFree(pChkp->buf); - taosMemoryFree(pChkp->path); - - taosArrayDestroyP(pChkp->pSST, taosMemoryFree); - taosArrayDestroyP(pChkp->pAdd, taosMemoryFree); - taosArrayDestroyP(pChkp->pDel, taosMemoryFree); - - taosHashCleanup(pChkp->pSstTbl[0]); - taosHashCleanup(pChkp->pSstTbl[1]); - - taosMemoryFree(pChkp->pCurrent); - taosMemoryFree(pChkp->pManifest); -} - -int32_t dbChkpInit(SDbChkp* p) { - if (p == NULL) return 0; - return 0; -} -int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { - taosThreadRwlockRdlock(&p->rwLock); - int32_t code = 0; - int32_t len = p->len + 128; - - char* srcBuf = taosMemoryCalloc(1, len); - char* dstBuf = taosMemoryCalloc(1, len); - - char* srcDir = taosMemoryCalloc(1, len); - char* dstDir = taosMemoryCalloc(1, len); - - sprintf(srcDir, "%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoint", p->curChkpId); - sprintf(dstDir, "%s%s%s", p->path, TD_DIRSEP, dname); - - if (!taosDirExist(srcDir)) { - stError("failed to dump srcDir %s, reason: not exist such dir", srcDir); - code = -1; - goto _ERROR; - } - - code = taosMkDir(dstDir); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr()); - goto _ERROR; - } - - // clear current file - memset(dstBuf, 0, len); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent); - taosRemoveFile(dstBuf); - - memset(dstBuf, 0, len); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest); - taosRemoveFile(dstBuf); - - // add file to $name dir - for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) { - memset(dstBuf, 0, len); - memset(srcBuf, 0, len); - - char* filename = taosArrayGetP(p->pAdd, i); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); - - taosCopyFile(srcBuf, dstBuf); - } - // del file in $name - for (int i = 0; i < taosArrayGetSize(p->pDel); i++) { - memset(dstBuf, 0, len); - memset(srcBuf, 0, len); - - char* filename = taosArrayGetP(p->pDel, i); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); - taosRemoveFile(dstBuf); - } - - // copy current file to dst dir - memset(srcBuf, 0, len); - memset(dstBuf, 0, len); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent); - taosCopyFile(srcBuf, dstBuf); - - // copy manifest file to dst dir - memset(srcBuf, 0, len); - memset(dstBuf, 0, len); - sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); - sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest); - taosCopyFile(srcBuf, dstBuf); - - // clear delta data buf - taosArrayClearP(p->pAdd, taosMemoryFree); - taosArrayClearP(p->pDel, taosMemoryFree); - -_ERROR: - taosThreadRwlockUnlock(&p->rwLock); - taosMemoryFree(srcBuf); - taosMemoryFree(dstBuf); - taosMemoryFree(srcDir); - taosMemoryFree(dstDir); - return code; -} -SBkdMgt* bkdMgtCreate(char* path) { - SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); - p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - taosThreadRwlockInit(&p->rwLock, NULL); - return p; -} - -void bkdMgtDestroy(SBkdMgt* bm) { - if (bm == NULL) return; - void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL); - while (pIter) { - SDbChkp* pChkp = *(SDbChkp**)(pIter); - dbChkpDestroy(pChkp); - - pIter = taosHashIterate(bm->pDbChkpTbl, pIter); - } - - taosThreadRwlockDestroy(&bm->rwLock); - - taosMemoryFree(bm); -} -int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list) { - int32_t code = 0; - taosThreadRwlockWrlock(&bm->rwLock); - - SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); - code = dbChkpGetDelta(pChkp, chkpId, list); - - taosThreadRwlockUnlock(&bm->rwLock); - return code; -} - -int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) { - int32_t code = -1; - - taosThreadRwlockWrlock(&bm->rwLock); - SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); - if (pp == NULL) { - SDbChkp* p = dbChkpCreate(path, 0); - if (p != NULL) { - taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*)); - code = 0; - } - } else { - stError("task chkp already exists"); - } - - taosThreadRwlockUnlock(&bm->rwLock); - - return code; -} - -int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) { - int32_t code = 0; - taosThreadRwlockRdlock(&bm->rwLock); - - SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); - code = dbChkpDumpTo(p, dname); - - taosThreadRwlockUnlock(&bm->rwLock); - return code; -} - bool isValidCheckpoint(const char* dir) { return true; } int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { @@ -1582,22 +1279,6 @@ int parKeyToString(void* k, char* buf) { n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key); return n; } -// int stremaValueEncode(void* k, char* buf) { -// int len = 0; -// SStreamValue* key = k; -// len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp); -// len += taosEncodeFixedI32((void**)&buf, key->len); -// len += taosEncodeBinary((void**)&buf, key->data, key->len); -// return len; -// } -// int streamValueDecode(void* k, char* buf) { -// SStreamValue* key = k; -// char* p = buf; -// p = taosDecodeFixedI64(p, &key->unixTimestamp); -// p = taosDecodeFixedI32(p, &key->len); -// p = taosDecodeBinary(p, (void**)&key->data, key->len); -// return p - buf; -// } int32_t valueToString(void* k, char* buf) { SStreamValue* key = k; int n = 0; @@ -1874,7 +1555,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) { SCfInit* cfPara = &ginitDict[i]; rocksdb_comparator_t* compare = - rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpFunc, cfPara->cmpName); + rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName); rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); rocksdb_compactionfilterfactory_t* filterFactory = @@ -2181,7 +1862,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t SCfInit* cfPara = &ginitDict[idx]; rocksdb_comparator_t* compare = - rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpFunc, cfPara->cmpName); + rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare); pCompare[i] = compare; } @@ -2265,7 +1946,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t SCfInit* cfPara = &ginitDict[i]; rocksdb_comparator_t* compare = - rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpFunc, cfPara->cmpName); + rocksdb_comparator_create(NULL, cfPara->destroyCmp, cfPara->cmpKey, cfPara->cmpName); rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); inst->pCompares[i] = compare; @@ -2345,7 +2026,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { for (int i = 0; i < cfLen; i++) { SCfInit* cf = &ginitDict[i]; - rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->destroyCmp, cf->cmpFunc, cf->cmpName); + rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->destroyCmp, cf->cmpKey, cf->cmpName); rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare); pCompare[i] = compare; } @@ -3672,4 +3353,305 @@ _err: taosMemoryFreeClear(dstName); taosCloseDir(&pDir); return code >= 0 ? 0 : -1; +} + +int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { + int32_t code = 0; + size_t len = 0; + void* pIter = taosHashIterate(p2, NULL); + while (pIter) { + char* name = taosHashGetKey(pIter, &len); + if (!taosHashGet(p1, name, len)) { + char* p = taosStrdup(name); + taosArrayPush(diff, &p); + } + pIter = taosHashIterate(p2, pIter); + } + return code; +} +int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { + int32_t code = 0; + + code = compareHashTableImpl(p1, p2, add); + code = compareHashTableImpl(p2, p1, del); + + return code; +} + +int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) { + taosThreadRwlockWrlock(&p->rwLock); + + p->preCkptId = p->curChkpId; + p->curChkpId = chkpId; + const char* pCurrent = "CURRENT"; + int32_t currLen = strlen(pCurrent); + + const char* pManifest = "MANIFEST-"; + int32_t maniLen = strlen(pManifest); + + const char* pSST = ".sst"; + int32_t sstLen = strlen(pSST); + + memset(p->buf, 0, p->len); + sprintf(p->buf, "%s%scheckpoint%" PRId64 "", p->path, TD_DIRSEP, chkpId); + + taosArrayClearP(p->pAdd, taosMemoryFree); + taosArrayClearP(p->pDel, taosMemoryFree); + + TdDirPtr pDir = taosOpenDir(p->buf); + TdDirEntryPtr de = NULL; + int8_t dummy = 0; + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { + taosMemoryFreeClear(p->pCurrent); + p->pCurrent = taosStrdup(name); + taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + + if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { + taosMemoryFreeClear(p->pManifest); + p->pManifest = taosStrdup(name); + taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { + // char* p = taosStrdup(name); + taosHashPut(p->pSstTbl[1 - p->idx], name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + } + if (p->init == 0) { + void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL); + while (pIter) { + size_t len; + char* name = taosHashGetKey(pIter, &len); + if (name != NULL && len != 0) { + taosArrayPush(p->pAdd, &name); + } + pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter); + } + if (taosArrayGetSize(p->pAdd) > 0) p->update = 1; + + p->init = 1; + p->preCkptId = -1; + p->curChkpId = chkpId; + } else { + int32_t code = compareHashTable(p->pSstTbl[p->idx], p->pSstTbl[1 - p->idx], p->pAdd, p->pDel); + if (code != 0) { + // dead code + taosArrayClearP(p->pAdd, taosMemoryFree); + taosArrayClearP(p->pDel, taosMemoryFree); + taosHashClear(p->pSstTbl[1 - p->idx]); + p->update = 0; + taosCloseDir(&pDir); + return code; + } + + if (taosArrayGetSize(p->pAdd) == 0 && taosArrayGetSize(p->pDel) == 0) { + p->update = 0; + } + + p->preCkptId = p->curChkpId; + p->curChkpId = chkpId; + } + + taosHashClear(p->pSstTbl[p->idx]); + p->idx = 1 - p->idx; + + taosCloseDir(&pDir); + taosThreadRwlockUnlock(&p->rwLock); + + return 0; +} + +SDbChkp* dbChkpCreate(char* path, int64_t initChkpId) { + SDbChkp* p = taosMemoryCalloc(1, sizeof(SDbChkp)); + p->curChkpId = initChkpId; + p->preCkptId = -1; + p->pSST = taosArrayInit(64, sizeof(void*)); + p->path = taosStrdup(path); + p->len = strlen(path) + 128; + p->buf = taosMemoryCalloc(1, p->len); + + p->idx = 0; + p->pSstTbl[0] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + p->pSstTbl[1] = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + + p->pAdd = taosArrayInit(64, sizeof(void*)); + p->pDel = taosArrayInit(64, sizeof(void*)); + p->update = 0; + taosThreadRwlockInit(&p->rwLock, NULL); + + SArray* list = NULL; + int32_t code = dbChkpGetDelta(p, initChkpId, list); + + return p; +} + +void dbChkpDestroy(SDbChkp* pChkp) { + taosMemoryFree(pChkp->buf); + taosMemoryFree(pChkp->path); + + taosArrayDestroyP(pChkp->pSST, taosMemoryFree); + taosArrayDestroyP(pChkp->pAdd, taosMemoryFree); + taosArrayDestroyP(pChkp->pDel, taosMemoryFree); + + taosHashCleanup(pChkp->pSstTbl[0]); + taosHashCleanup(pChkp->pSstTbl[1]); + + taosMemoryFree(pChkp->pCurrent); + taosMemoryFree(pChkp->pManifest); +} + +int32_t dbChkpInit(SDbChkp* p) { + if (p == NULL) return 0; + return 0; +} +int32_t dbChkpDumpTo(SDbChkp* p, char* dname) { + taosThreadRwlockRdlock(&p->rwLock); + int32_t code = 0; + int32_t len = p->len + 128; + + char* srcBuf = taosMemoryCalloc(1, len); + char* dstBuf = taosMemoryCalloc(1, len); + + char* srcDir = taosMemoryCalloc(1, len); + char* dstDir = taosMemoryCalloc(1, len); + + sprintf(srcDir, "%s%s%s%" PRId64 "", p->path, TD_DIRSEP, "checkpoint", p->curChkpId); + sprintf(dstDir, "%s%s%s", p->path, TD_DIRSEP, dname); + + if (!taosDirExist(srcDir)) { + stError("failed to dump srcDir %s, reason: not exist such dir", srcDir); + code = -1; + goto _ERROR; + } + + code = taosMkDir(dstDir); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr()); + goto _ERROR; + } + + // clear current file + memset(dstBuf, 0, len); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent); + taosRemoveFile(dstBuf); + + memset(dstBuf, 0, len); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest); + taosRemoveFile(dstBuf); + + // add file to $name dir + for (int i = 0; i < taosArrayGetSize(p->pAdd); i++) { + memset(dstBuf, 0, len); + memset(srcBuf, 0, len); + + char* filename = taosArrayGetP(p->pAdd, i); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); + + taosCopyFile(srcBuf, dstBuf); + } + // del file in $name + for (int i = 0; i < taosArrayGetSize(p->pDel); i++) { + memset(dstBuf, 0, len); + memset(srcBuf, 0, len); + + char* filename = taosArrayGetP(p->pDel, i); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename); + taosRemoveFile(dstBuf); + } + + // copy current file to dst dir + memset(srcBuf, 0, len); + memset(dstBuf, 0, len); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent); + taosCopyFile(srcBuf, dstBuf); + + // copy manifest file to dst dir + memset(srcBuf, 0, len); + memset(dstBuf, 0, len); + sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest); + sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest); + taosCopyFile(srcBuf, dstBuf); + + // clear delta data buf + taosArrayClearP(p->pAdd, taosMemoryFree); + taosArrayClearP(p->pDel, taosMemoryFree); + +_ERROR: + taosThreadRwlockUnlock(&p->rwLock); + taosMemoryFree(srcBuf); + taosMemoryFree(dstBuf); + taosMemoryFree(srcDir); + taosMemoryFree(dstDir); + return code; +} +SBkdMgt* bkdMgtCreate(char* path) { + SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); + p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + taosThreadRwlockInit(&p->rwLock, NULL); + return p; +} + +void bkdMgtDestroy(SBkdMgt* bm) { + if (bm == NULL) return; + void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL); + while (pIter) { + SDbChkp* pChkp = *(SDbChkp**)(pIter); + dbChkpDestroy(pChkp); + + pIter = taosHashIterate(bm->pDbChkpTbl, pIter); + } + + taosThreadRwlockDestroy(&bm->rwLock); + + taosMemoryFree(bm); +} +int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list) { + int32_t code = 0; + taosThreadRwlockWrlock(&bm->rwLock); + + SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); + code = dbChkpGetDelta(pChkp, chkpId, list); + + taosThreadRwlockUnlock(&bm->rwLock); + return code; +} + +int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) { + int32_t code = -1; + + taosThreadRwlockWrlock(&bm->rwLock); + SDbChkp** pp = taosHashGet(bm->pDbChkpTbl, task, strlen(task)); + if (pp == NULL) { + SDbChkp* p = dbChkpCreate(path, 0); + if (p != NULL) { + taosHashPut(bm->pDbChkpTbl, task, strlen(task), &p, sizeof(void*)); + code = 0; + } + } else { + stError("task chkp already exists"); + } + + taosThreadRwlockUnlock(&bm->rwLock); + + return code; +} + +int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) { + int32_t code = 0; + taosThreadRwlockRdlock(&bm->rwLock); + + SDbChkp* p = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId)); + code = dbChkpDumpTo(p, dname); + + taosThreadRwlockUnlock(&bm->rwLock); + return code; } \ No newline at end of file