diff --git a/include/util/tarray.h b/include/util/tarray.h index a93c695370..1b6b4587b7 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -200,13 +200,16 @@ void taosArrayClear(SArray* pArray); * @param pArray * @param fp */ + void taosArrayClearEx(SArray* pArray, void (*fp)(void*)); +void taosArrayClearP(SArray* pArray, void (*fp)(void*)); + void* taosArrayDestroy(SArray* pArray); -void taosArrayDestroyP(SArray* pArray, FDelete fp); +void taosArrayDestroyP(SArray* pArray, FDelete fp); -void taosArrayDestroyEx(SArray* pArray, FDelete fp); +void taosArrayDestroyEx(SArray* pArray, FDelete fp); void taosArraySwap(SArray* a, SArray* b); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8492410a46..0804ad4ede 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -31,6 +31,9 @@ typedef struct { char* path; char* buf; int32_t len; + + SArray* pAdd; + SArray* pDel; } SBackendManager; typedef struct SCompactFilteFactory { @@ -149,8 +152,25 @@ SBackendManager* backendManagerCreate(char* path) { p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); + + p->pAdd = taosArrayInit(64, sizeof(void*)); + p->pDel = taosArrayInit(64, sizeof(void*)); return p; } +void backendManagerDestroy(SBackendManager* bm) { + if (bm == NULL) return; + + taosMemoryFree(bm->buf); + taosMemoryFree(bm->path); + + taosHashCleanup(bm->pSSTable); + + taosArrayDestroyP(bm->pSST, taosMemoryFree); + taosArrayDestroyP(bm->pAdd, taosMemoryFree); + taosArrayDestroyP(bm->pDel, taosMemoryFree); + + taosMemoryFree(bm); +} int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { int32_t code = 0; @@ -191,6 +211,9 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list ? bm->pSSTable : taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + taosArrayClearP(bm->pAdd, taosMemoryFree); + taosArrayClearP(bm->pDel, taosMemoryFree); + TdDirPtr pDir = taosOpenDir(bm->buf); TdDirEntryPtr de = NULL; int8_t dummy = 0; @@ -222,23 +245,23 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list bm->curChkpId = chkpId; bm->init = 1; - SArray* add = taosArrayInit(64, sizeof(void*)); + // SArray* add = taosArrayInit(64, sizeof(void*)); void* pIter = taosHashIterate(pTable, NULL); while (pIter) { size_t len; char* name = taosHashGetKey(pIter, &len); if (name != NULL && len != 0) { - taosArrayPush(add, &name); + taosArrayPush(bm->pAdd, &name); } pIter = taosHashIterate(pTable, pIter); } } else { - SArray* add = taosArrayInit(64, sizeof(void*)); - SArray* del = taosArrayInit(64, sizeof(void*)); + // SArray* add = taosArrayInit(64, sizeof(void*)); + // SArray* del = taosArrayInit(64, sizeof(void*)); - int32_t code = compareHashTable(bm->pSSTable, pTable, add, del); + int32_t code = compareHashTable(bm->pSSTable, pTable, bm->pAdd, bm->pDel); bm->curChkpId = chkpId; taosHashCleanup(pTable); @@ -248,7 +271,8 @@ int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list int32_t backendManagerDumpTo(SBackendManager* bm, char* name) { int32_t code = 0; - char* buf = taosMemoryCalloc(1, strlen(bm->path) + 64); + int32_t len = bm->len + 64; + char* buf = taosMemoryCalloc(1, len); sprintf(buf, "%s%s%s", bm->path, TD_DIRSEP, name); code = taosMkDir(buf); @@ -256,8 +280,37 @@ int32_t backendManagerDumpTo(SBackendManager* bm, char* name) { return code; } - + // clear current file + memset(buf, 0, len); + sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, bm->pCurrent); + taosRemoveFile(buf); + memset(buf, 0, len); + sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, bm->pManifest); + taosRemoveFile(buf); + + for (int i = 0; i < taosArrayGetSize(bm->pAdd); i++) { + memset(buf, 0, len); + + char* filename = taosArrayGetP(bm->pAdd, i); + sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, filename); + + char* src = taosMemoryCalloc(1, len); + sprintf(src, "%s%s%s%" PRId64 "%s%s", bm->path, TD_DIRSEP, "checkpoint", bm->curChkpId, TD_DIRSEP, filename); + taosCopyFile(src, buf); + } + + for (int i = 0; i < taosArrayGetSize(bm->pDel); i++) { + memset(buf, 0, len); + + char* filename = taosArrayGetP(bm->pDel, i); + sprintf(buf, "%s%s%s%s%s", bm->path, TD_DIRSEP, name, TD_DIRSEP, filename); + taosRemoveFile(buf); + } + // clear delta data + taosArrayClearP(bm->pAdd, taosMemoryFree); + taosArrayClearP(bm->pDel, taosMemoryFree); + return code; } SCfInit ginitDict[] = { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 8906391a9a..06beba0655 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -191,7 +191,7 @@ void* taosArrayGet(const SArray* pArray, size_t index) { } if (index >= pArray->size) { - uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity); + uError("index is out of range, current:%" PRIzu " max:%d", index, pArray->capacity); return NULL; } @@ -319,7 +319,7 @@ SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) { if (NULL == pSrc) { return NULL; } - + if (pSrc->size == 0) { // empty array list return taosArrayInit(8, pSrc->elemSize); } @@ -360,6 +360,23 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) { pArray->size = 0; } +void taosArrayClearP(SArray* pArray, void (*fp)(void*)) { + // if (pArray == NULL) return; + // if (fp == NULL) { + // pArray->size = 0; + // return; + // } + + // for (int32_t i = 0; i < pArray->size; ++i) { + // fp(TARRAY_GET_ELEM(pArray, i)); + // } + if (pArray) { + for (int32_t i = 0; i < pArray->size; i++) { + fp(*(void**)TARRAY_GET_ELEM(pArray, i)); + } + } + taosArrayClear(pArray); +} void* taosArrayDestroy(SArray* pArray) { if (pArray) { @@ -492,7 +509,7 @@ void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t // order array void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param) { taosqsort(pArray->pData, pArray->size, pArray->elemSize, param, fn); -// taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); + // taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); } void taosArraySwap(SArray* a, SArray* b) {