support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-07 11:39:32 +00:00
parent 6a69c56b32
commit 80e78e054d
1 changed files with 11 additions and 8 deletions

View File

@ -157,20 +157,23 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
pMeta->streamBackendRid = -1; pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL; pMeta->streamBackend = NULL;
char* path1 = taosMemoryCalloc(1, strlen(pMeta->path) + 64); char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(path1, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(path1); taosRemoveDir(defaultPath);
char* path2 = taosMemoryCalloc(1, strlen(pMeta->path) + 64); char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(path2, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
if (taosRenameFile(path2, path1) < 0) { if (taosRenameFile(newPath, defaultPath) < 0) {
taosMemoryFree(path1); taosMemoryFree(defaultPath);
taosMemoryFree(path2); taosMemoryFree(newPath);
return -1; return -1;
} }
pMeta->streamBackend = streamBackendInit(pMeta->path, 0); pMeta->streamBackend = streamBackendInit(pMeta->path, 0);
if (pMeta->streamBackend == NULL) {
return -1;
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
taosHashClear(pMeta->pTasks); taosHashClear(pMeta->pTasks);