refact task backend

This commit is contained in:
yihaoDeng 2023-10-08 10:35:49 +08:00
parent 98099ebbae
commit 2fa91341a7
1 changed files with 14 additions and 10 deletions

View File

@ -1685,22 +1685,27 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) {
} }
STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper)); STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper));
pTaskBackend->idstr = taosStrdup(key);
pTaskBackend->path = statePath; pTaskBackend->path = statePath;
taosThreadMutexInit(&pTaskBackend->mutex, NULL);
taskBackendInitChkpOpt(pTaskBackend); taskBackendInitChkpOpt(pTaskBackend);
taskBackendInitDBOpt(pTaskBackend); taskBackendInitDBOpt(pTaskBackend);
statePath = NULL;
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err); cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err);
if (nCf == 0) { if (nCf == 0) {
// pre create db
pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], dbPath, &err); pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], dbPath, &err);
rocksdb_close(pTaskBackend->db); rocksdb_close(pTaskBackend->db);
if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf); if (cfNames != NULL) {
rocksdb_list_column_families_destroy(cfNames, nCf);
}
taosMemoryFree(err); taosMemoryFree(err);
}
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err); cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err);
ASSERT(err != NULL); ASSERT(err != NULL);
}
if (taskBackendOpenCfs(pTaskBackend, dbPath, cfNames, nCf) != 0) { if (taskBackendOpenCfs(pTaskBackend, dbPath, cfNames, nCf) != 0) {
goto _EXIT; goto _EXIT;
@ -1713,16 +1718,15 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) {
qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskBackend); qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskBackend);
taosMemoryFree(dbPath); taosMemoryFree(dbPath);
pTaskBackend->idstr = taosStrdup(key);
taosThreadMutexInit(&pTaskBackend->mutex, NULL);
return pTaskBackend; return pTaskBackend;
_EXIT: _EXIT:
taskBackendDestroy(pTaskBackend); taskBackendDestroy(pTaskBackend);
if (err != NULL) taosMemoryFree(err); if (err) taosMemoryFree(err);
if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf); if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
if (dbPath) taosMemoryFree(dbPath);
if (statePath) taosMemoryFree(statePath);
return NULL; return NULL;
} }