diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 19dfe3cf8d..775eaebf89 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1685,22 +1685,27 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) { } STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper)); - + pTaskBackend->idstr = taosStrdup(key); pTaskBackend->path = statePath; + taosThreadMutexInit(&pTaskBackend->mutex, NULL); taskBackendInitChkpOpt(pTaskBackend); taskBackendInitDBOpt(pTaskBackend); + statePath = NULL; cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err); if (nCf == 0) { + // pre create db pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], dbPath, &err); rocksdb_close(pTaskBackend->db); - if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf); + if (cfNames != NULL) { + rocksdb_list_column_families_destroy(cfNames, nCf); + } taosMemoryFree(err); - } - cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err); - ASSERT(err != NULL); + cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err); + ASSERT(err != NULL); + } if (taskBackendOpenCfs(pTaskBackend, dbPath, cfNames, nCf) != 0) { goto _EXIT; @@ -1713,16 +1718,15 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) { qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskBackend); taosMemoryFree(dbPath); - pTaskBackend->idstr = taosStrdup(key); - taosThreadMutexInit(&pTaskBackend->mutex, NULL); - return pTaskBackend; _EXIT: taskBackendDestroy(pTaskBackend); - if (err != NULL) taosMemoryFree(err); - if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf); + if (err) taosMemoryFree(err); + if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf); + if (dbPath) taosMemoryFree(dbPath); + if (statePath) taosMemoryFree(statePath); return NULL; }