diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5d0861624e..7e264128d9 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -157,7 +157,20 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { pMeta->streamBackendRid = -1; pMeta->streamBackend = NULL; - pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); + char* path1 = taosMemoryCalloc(1, strlen(pMeta->path) + 64); + sprintf(path1, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); + taosRemoveDir(path1); + + char* path2 = taosMemoryCalloc(1, strlen(pMeta->path) + 64); + sprintf(path2, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); + + if (taosRenameFile(path2, path1) < 0) { + taosMemoryFree(path1); + taosMemoryFree(path2); + return -1; + } + + pMeta->streamBackend = streamBackendInit(pMeta->path, 0); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); taosHashClear(pMeta->pTasks); @@ -170,8 +183,6 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { taosArrayClear(pMeta->checkpointInUse); - // if (streamLoadTasks(pMeta,int64_t ver)) - return 0; } void streamMetaClose(SStreamMeta* pMeta) {