diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index bf05437117..9ba1db5b9e 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -212,13 +212,13 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { return 0; } -void* streamBackendInit(const char* path, int64_t chkpId) { - uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; +void* streamBackendInit(const char* streamPath, int64_t chkpId) { + char* backendPath = NULL; + int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); - char* state = NULL; - int32_t code = rebuildDirFromCheckpoint(path, chkpId, &state); + qDebug("start to init stream backend at %s", backendPath); - qDebug("start to init stream backend at %s", state); + uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); pHandle->list = tdListNew(sizeof(SCfComparator)); taosThreadMutexInit(&pHandle->mutex, NULL); @@ -254,12 +254,12 @@ void* streamBackendInit(const char* path, int64_t chkpId) { char* err = NULL; size_t nCf = 0; - char** cfs = rocksdb_list_column_families(opts, state, &nCf, &err); + char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err); if (nCf == 0 || nCf == 1 || err != NULL) { taosMemoryFreeClear(err); - pHandle->db = rocksdb_open(opts, path, &err); + pHandle->db = rocksdb_open(opts, backendPath, &err); if (err != NULL) { - qError("failed to open rocksdb, path:%s, reason:%s", path, err); + qError("failed to open rocksdb, path:%s, reason:%s", backendPath, err); taosMemoryFreeClear(err); goto _EXIT; } @@ -267,13 +267,13 @@ void* streamBackendInit(const char* path, int64_t chkpId) { /* list all cf and get prefix */ - streamStateOpenBackendCf(pHandle, (char*)state, cfs, nCf); + streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf); } if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } - qDebug("succ to init stream backend at %s, backend:%p", state, pHandle); - taosMemoryFreeClear(state); + qDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle); + taosMemoryFreeClear(backendPath); return (void*)pHandle; _EXIT: @@ -285,9 +285,9 @@ _EXIT: taosHashCleanup(pHandle->cfInst); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); - taosMemoryFree(state); + taosMemoryFree(backendPath); taosMemoryFree(pHandle); - qDebug("failed to init stream backend at %s", path); + qDebug("failed to init stream backend at %s", backendPath); return NULL; } void streamBackendCleanup(void* arg) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2976a65d1d..d31c4337fa 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -51,8 +51,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { goto _err; } - memset(streamPath, 0, len); + memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); code = taosMulModeMkDir(streamPath, 0755); if (code != 0) { @@ -90,13 +90,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - memset(streamPath, 0, len); - sprintf(streamPath, "%s/%s", pMeta->path, "state"); - code = taosMulModeMkDir(streamPath, 0755); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - goto _err; - } + // memset(streamPath, 0, len); + // sprintf(streamPath, "%s/%s", pMeta->path, "state"); + // code = taosMulModeMkDir(streamPath, 0755); + // if (code != 0) { + // terrno = TAOS_SYSTEM_ERROR(code); + // goto _err; + // } pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);