diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d2aab7459b..4c86beb44d 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -39,8 +39,9 @@ typedef struct { int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); -void destroyRocksdbCfInst(RocksdbCfInst* inst); -int32_t getCfIdx(const char* cfName); +void destroyRocksdbCfInst(RocksdbCfInst* inst); +int32_t getCfIdx(const char* cfName); +STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath); void destroyCompactFilteFactory(void* arg); void destroyCompactFilte(void* arg); @@ -188,7 +189,14 @@ int32_t getCfIdx(const char* cfName) { return idx; } -bool isValidCheckpoint(const char* dir) { return true; } +bool isValidCheckpoint(const char* dir) { + STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir); + if (pDb == NULL) { + return true; + } + taskDbDestroy(pDb); + return true; +} int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { // impl later @@ -1707,25 +1715,19 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) { p->chkpId = chkpId; taosThreadMutexUnlock(&p->mutex); } -STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) { - char* statePath = NULL; + +STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) { char* err = NULL; - char* dbPath = NULL; char** cfNames = NULL; size_t nCf = 0; - if (rebuildDirFromChkp2(path, key, chkpId, &statePath, &dbPath) != 0) { - return NULL; - } - STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper)); - pTaskDb->idstr = taosStrdup(key); - pTaskDb->path = statePath; + pTaskDb->idstr = key ? taosStrdup(key) : NULL; + pTaskDb->path = statePath ? taosStrdup(statePath) : NULL; taosThreadMutexInit(&pTaskDb->mutex, NULL); taskDbInitChkpOpt(pTaskDb); taskDbInitOpt(pTaskDb); - statePath = NULL; cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err); if (nCf == 0) { @@ -1752,19 +1754,27 @@ STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) { } qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb); - taosMemoryFree(dbPath); - - return pTaskDb; _EXIT: - taskDbDestroy(pTaskDb); + taskDbDestroy(pTaskDb); if (err) taosMemoryFree(err); if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf); - if (dbPath) taosMemoryFree(dbPath); - if (statePath) taosMemoryFree(statePath); return NULL; } +STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) { + char* statePath = NULL; + char* dbPath = NULL; + + if (rebuildDirFromChkp2(path, key, chkpId, &statePath, &dbPath) != 0) { + return NULL; + } + // taosMemoryFree(statePath); + + STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath); + taosMemoryFree(dbPath); + return pTaskDb; +} void taskDbDestroy(void* pDb) { STaskDbWrapper* wrapper = pDb;