fix stream backend convert

This commit is contained in:
yihaoDeng 2023-11-17 16:50:45 +08:00
parent 286413abbb
commit e7830bfdfd
1 changed files with 29 additions and 19 deletions

View File

@ -39,8 +39,9 @@ typedef struct {
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
void destroyRocksdbCfInst(RocksdbCfInst* inst);
int32_t getCfIdx(const char* cfName);
void destroyRocksdbCfInst(RocksdbCfInst* inst);
int32_t getCfIdx(const char* cfName);
STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath);
void destroyCompactFilteFactory(void* arg);
void destroyCompactFilte(void* arg);
@ -188,7 +189,14 @@ int32_t getCfIdx(const char* cfName) {
return idx;
}
bool isValidCheckpoint(const char* dir) { return true; }
bool isValidCheckpoint(const char* dir) {
STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir);
if (pDb == NULL) {
return true;
}
taskDbDestroy(pDb);
return true;
}
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
// impl later
@ -1707,25 +1715,19 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
p->chkpId = chkpId;
taosThreadMutexUnlock(&p->mutex);
}
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) {
char* statePath = NULL;
STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) {
char* err = NULL;
char* dbPath = NULL;
char** cfNames = NULL;
size_t nCf = 0;
if (rebuildDirFromChkp2(path, key, chkpId, &statePath, &dbPath) != 0) {
return NULL;
}
STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper));
pTaskDb->idstr = taosStrdup(key);
pTaskDb->path = statePath;
pTaskDb->idstr = key ? taosStrdup(key) : NULL;
pTaskDb->path = statePath ? taosStrdup(statePath) : NULL;
taosThreadMutexInit(&pTaskDb->mutex, NULL);
taskDbInitChkpOpt(pTaskDb);
taskDbInitOpt(pTaskDb);
statePath = NULL;
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
if (nCf == 0) {
@ -1752,19 +1754,27 @@ STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) {
}
qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb);
taosMemoryFree(dbPath);
return pTaskDb;
_EXIT:
taskDbDestroy(pTaskDb);
taskDbDestroy(pTaskDb);
if (err) taosMemoryFree(err);
if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
if (dbPath) taosMemoryFree(dbPath);
if (statePath) taosMemoryFree(statePath);
return NULL;
}
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) {
char* statePath = NULL;
char* dbPath = NULL;
if (rebuildDirFromChkp2(path, key, chkpId, &statePath, &dbPath) != 0) {
return NULL;
}
// taosMemoryFree(statePath);
STaskDbWrapper* pTaskDb = taskDbOpenImpl(key, statePath, dbPath);
taosMemoryFree(dbPath);
return pTaskDb;
}
void taskDbDestroy(void* pDb) {
STaskDbWrapper* wrapper = pDb;