diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f926efb94d..d72901a2a3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -21,14 +21,16 @@ #include "tref.h" typedef struct { - char* pCurrent; - char* pManifest; - SArray* pSST; - int64_t preCkptId; - int64_t curChkpId; - char* path; - char* buf; - int32_t len; + int8_t init; + char* pCurrent; + char* pManifest; + SArray* pSST; + int64_t preCkptId; + int64_t curChkpId; + SHashObj* pSSTable; + char* path; + char* buf; + int32_t len; } SBackendManager; typedef struct SCompactFilteFactory { @@ -144,37 +146,90 @@ SBackendManager* backendManagerCreate(char* path) { p->preCkptId = 0; p->pSST = taosArrayInit(64, sizeof(void*)); p->path = taosStrdup(path); - + p->pSSTable = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); p->len = strlen(path) + 128; p->buf = taosMemoryCalloc(1, p->len); return p; } +int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) { + int32_t code = 0; + size_t len = 0; + void* pIter = taosHashIterate(p2, NULL); + while (pIter) { + char* name = taosHashGetKey(pIter, &len); + if (!taosHashGet(p1, name, len)) { + char* p = taosStrdup(name); + taosArrayPush(diff, &p); + } + pIter = taosHashIterate(p2, pIter); + } + return code; +} +int32_t compareHashTable(SHashObj* p1, SHashObj* p2, SArray* add, SArray* del) { + int32_t code = 0; + + code = compareHashTableImpl(p1, p2, add); + code = compareHashTableImpl(p2, p1, del); + + return code; +} int32_t backendManagerGetDelta(SBackendManager* bm, int64_t chkpId, SArray* list) { + const char* pCurrent = "CURRENT"; + int32_t currLen = strlen(pCurrent); + + const char* pManifest = "MANIFEST-"; + int32_t maniLen = strlen(pManifest); + + const char* pSST = ".sst"; + int32_t sstLen = strlen(pSST); + memset(bm->buf, 0, bm->len); sprintf(bm->buf, "%s%scheckpoint%" PRId64 "", bm->path, TD_DIRSEP, chkpId); + SHashObj* pTable = bm->init == 0 + ? bm->pSSTable + : taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + TdDirPtr pDir = taosOpenDir(bm->buf); TdDirEntryPtr de = NULL; - + int8_t dummy = 0; while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) { + taosMemoryFreeClear(bm->pCurrent); + bm->pCurrent = taosStrdup(name); + taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) { + taosMemoryFreeClear(bm->pManifest); + bm->pManifest = taosStrdup(name); + taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + continue; + } + if (strlen(name) >= sstLen && strncmp(name + strlen(name) - 4, pSST, sstLen) == 0) { + char* p = taosStrdup(name); - // sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name); - // sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name); - // if (!taosDirEntryIsDir(de)) { - // code = taosCopyFile(absSrcPath, absDstPath); - // if (code == -1) { - // goto _err; - // } - // } - - // memset(absSrcPath, 0, sLen + 64); - // memset(absDstPath, 0, dLen + 64); + taosHashPut(pTable, name, strlen(name), &dummy, sizeof(dummy)); + continue; + } } + if (bm->init == 0) { + bm->preCkptId = chkpId; + bm->curChkpId = chkpId; + bm->init = 1; + } else { + SArray* add = taosArrayInit(64, sizeof(void*)); + SArray* del = taosArrayInit(64, sizeof(void*)); + int32_t code = compareHashTable(bm->pSSTable, pTable, add, del); + + bm->curChkpId = chkpId; + taosHashCleanup(pTable); + } return 0; }