diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c6dafc4e41..5edba5e180 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -22,6 +22,7 @@ #define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); +int32_t copyFiles(const char* src, const char* dst); typedef struct SDbChkp { int8_t init; char* pCurrent; @@ -67,7 +68,7 @@ typedef struct { SHashObj* pDbChkpTbl; TdThreadRwlock rwLock; -} SBackendManager; +} SBkdMgt; typedef struct SCompactFilteFactory { void* status; @@ -146,7 +147,6 @@ typedef struct { } SCfInit; -void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void* arg); typedef struct { void* funcName; DestroyFactoryFunc destroy; @@ -486,14 +486,14 @@ _ERROR: taosMemoryFree(dstDir); return code; } -SBackendManager* bkdMgtCreate(char* path) { - SBackendManager* p = taosMemoryCalloc(1, sizeof(SBackendManager)); +SBkdMgt* bkdMgtCreate(char* path) { + SBkdMgt* p = taosMemoryCalloc(1, sizeof(SBkdMgt)); p->pDbChkpTbl = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); taosThreadRwlockInit(&p->rwLock, NULL); return p; } -void bkdMgtDestroy(SBackendManager* bm) { +void bkdMgtDestroy(SBkdMgt* bm) { if (bm == NULL) return; void* pIter = taosHashIterate(bm->pDbChkpTbl, NULL); while (pIter) { @@ -507,7 +507,7 @@ void bkdMgtDestroy(SBackendManager* bm) { taosMemoryFree(bm); } -int32_t bkdMgtGetDelta(SBackendManager* bm, char* taskId, int64_t chkpId, SArray* list) { +int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list) { int32_t code = 0; taosThreadRwlockWrlock(&bm->rwLock); @@ -518,7 +518,7 @@ int32_t bkdMgtGetDelta(SBackendManager* bm, char* taskId, int64_t chkpId, SArray return code; } -int32_t bkdMgtAddChkp(SBackendManager* bm, char* task, char* path) { +int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path) { int32_t code = -1; taosThreadRwlockWrlock(&bm->rwLock); @@ -538,7 +538,7 @@ int32_t bkdMgtAddChkp(SBackendManager* bm, char* task, char* path) { return code; } -int32_t bkdMgtDumpTo(SBackendManager* bm, char* taskId, char* dname) { +int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) { int32_t code = 0; taosThreadRwlockRdlock(&bm->rwLock); @@ -549,53 +549,8 @@ int32_t bkdMgtDumpTo(SBackendManager* bm, char* taskId, char* dname) { return code; } -void dbSetFilterFactory(rocksdb_options_t* opt, int32_t i, void* arg) { - rocksdb_compactionfilterfactory_t* filterFactory = rocksdb_compactionfilterfactory_create( - arg, ginitDict[i].destroyFilter, ginitDict[i].createFilter, ginitDict[i].funcName); - rocksdb_options_set_compaction_filter_factory(opt, filterFactory); -} - bool isValidCheckpoint(const char* dir) { return true; } -int32_t copyFiles(const char* src, const char* dst) { - int32_t code = 0; - // opt later, just hard link - int32_t sLen = strlen(src); - int32_t dLen = strlen(dst); - char* srcName = taosMemoryCalloc(1, sLen + 64); - char* dstName = taosMemoryCalloc(1, dLen + 64); - - TdDirPtr pDir = taosOpenDir(src); - if (pDir == NULL) { - taosMemoryFree(srcName); - taosMemoryFree(dstName); - return -1; - } - - TdDirEntryPtr de = NULL; - while ((de = taosReadDir(pDir)) != NULL) { - char* name = taosGetDirEntryName(de); - if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; - - sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); - sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); - if (!taosDirEntryIsDir(de)) { - code = taosCopyFile(srcName, dstName); - if (code == -1) { - goto _err; - } - } - - memset(srcName, 0, sLen + 64); - memset(dstName, 0, dLen + 64); - } - -_err: - taosMemoryFreeClear(srcName); - taosMemoryFreeClear(dstName); - taosCloseDir(&pDir); - return code >= 0 ? 0 : -1; -} int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { // impl later int32_t code = 0; @@ -1754,23 +1709,23 @@ void destroyCompactFilteFactory(void* arg) { } const char* compactFilteFactoryName(void* arg) { SCompactFilteFactory* state = arg; - return "stream_compact_filter"; + return "stream_compact_factory_filter"; } const char* compactFilteFactoryNameSess(void* arg) { SCompactFilteFactory* state = arg; - return "stream_compact_filter_sess"; + return "stream_compact_factory_filter_sess"; } const char* compactFilteFactoryNameState(void* arg) { SCompactFilteFactory* state = arg; - return "stream_compact_filter_state"; + return "stream_compact_factory_filter_state"; } const char* compactFilteFactoryNameFill(void* arg) { SCompactFilteFactory* state = arg; - return "stream_compact_filter_fill"; + return "stream_compact_factory_filter_fill"; } const char* compactFilteFactoryNameFunc(void* arg) { SCompactFilteFactory* state = arg; - return "stream_compact_filter_func"; + return "stream_compact_factory_filter_func"; } void destroyCompactFilte(void* arg) { (void)arg; } @@ -1788,21 +1743,18 @@ unsigned char compactFilteSess(void* arg, int level, const char* key, size_t kle char** newval, size_t* newvlen, unsigned char* value_changed) { // not impl yet return 0; - // return streamStateValueIsStale((char*)val) ? 1 : 0; } unsigned char compactFilteState(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { // not impl yet return 0; - // return streamStateValueIsStale((char*)val) ? 1 : 0; } unsigned char compactFilteFill(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, char** newval, size_t* newvlen, unsigned char* value_changed) { // not impl yet return 0; - // return streamStateValueIsStale((char*)val) ? 1 : 0; } unsigned char compactFilteFunc(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, @@ -1935,12 +1887,13 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) { rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName); rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare); + rocksdb_compactionfilterfactory_t* filterFactory = + rocksdb_compactionfilterfactory_create(NULL, cfPara->destroyFilter, cfPara->createFilter, cfPara->funcName); + rocksdb_options_set_compaction_filter_factory(opt, filterFactory); + pTaskDb->pCompares[i] = compare; pTaskDb->pCfOpts[i] = opt; pTaskDb->pCfParams[i].tableOpt = tableOpt; - - // set filter factory - dbSetFilterFactory(opt, i, NULL); } return; } @@ -2054,8 +2007,8 @@ _EXIT: return NULL; } -void taskDbDestroy(void* pBackend) { - STaskDbWrapper* wrapper = pBackend; +void taskDbDestroy(void* pDb) { + STaskDbWrapper* wrapper = pDb; qDebug("succ to destroy stream backend:%p", wrapper); int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -2107,7 +2060,7 @@ void taskDbDestroy(void* pBackend) { if (wrapper->db) rocksdb_close(wrapper->db); - taskDbDestroyChkpOpt(pBackend); + taskDbDestroyChkpOpt(wrapper); taosMemoryFree(wrapper->idstr); taosMemoryFree(wrapper->path); @@ -2116,14 +2069,14 @@ void taskDbDestroy(void* pBackend) { return; } -int32_t taskDbOpenCfByKey(STaskDbWrapper* pBackend, const char* key) { +int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) { int32_t code = 0; char* err = NULL; int8_t idx = getCfIdx(key); if (idx == -1) return -1; rocksdb_column_family_handle_t* cf = - rocksdb_create_column_family(pBackend->db, pBackend->pCfOpts[idx], ginitDict[idx].key, &err); + rocksdb_create_column_family(pDb->db, pDb->pCfOpts[idx], ginitDict[idx].key, &err); if (err != NULL) { stError("failed to open cf, key:%s, reason: %s", key, err); taosMemoryFree(err); @@ -2131,7 +2084,7 @@ int32_t taskDbOpenCfByKey(STaskDbWrapper* pBackend, const char* key) { return code; } - pBackend->pCf[idx] = cf; + pDb->pCf[idx] = cf; return code; } int32_t copyDataAt(RocksdbCfInst* pSrc, STaskDbWrapper* pDst, int8_t i) { @@ -3690,4 +3643,43 @@ uint32_t nextPow2(uint32_t x) { x = x | (x >> 8); x = x | (x >> 16); return x + 1; +} +int32_t copyFiles(const char* src, const char* dst) { + int32_t code = 0; + // opt later, just hard link + int32_t sLen = strlen(src); + int32_t dLen = strlen(dst); + char* srcName = taosMemoryCalloc(1, sLen + 64); + char* dstName = taosMemoryCalloc(1, dLen + 64); + + TdDirPtr pDir = taosOpenDir(src); + if (pDir == NULL) { + taosMemoryFree(srcName); + taosMemoryFree(dstName); + return -1; + } + + TdDirEntryPtr de = NULL; + while ((de = taosReadDir(pDir)) != NULL) { + char* name = taosGetDirEntryName(de); + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + + sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name); + sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name); + if (!taosDirEntryIsDir(de)) { + code = taosCopyFile(srcName, dstName); + if (code == -1) { + goto _err; + } + } + + memset(srcName, 0, sLen + 64); + memset(dstName, 0, dLen + 64); + } + +_err: + taosMemoryFreeClear(srcName); + taosMemoryFreeClear(dstName); + taosCloseDir(&pDir); + return code >= 0 ? 0 : -1; } \ No newline at end of file