support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-07 10:48:04 +00:00
parent b6c991f896
commit 6c8c575549
1 changed files with 14 additions and 3 deletions

View File

@ -157,7 +157,20 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId);
char* path1 = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(path1, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(path1);
char* path2 = taosMemoryCalloc(1, strlen(pMeta->path) + 64);
sprintf(path2, "%s%s%s", pMeta->path, TD_DIRSEP, "received");
if (taosRenameFile(path2, path1) < 0) {
taosMemoryFree(path1);
taosMemoryFree(path2);
return -1;
}
pMeta->streamBackend = streamBackendInit(pMeta->path, 0);
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
taosHashClear(pMeta->pTasks);
@ -170,8 +183,6 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
taosArrayClear(pMeta->checkpointInUse);
// if (streamLoadTasks(pMeta,int64_t ver))
return 0;
}
void streamMetaClose(SStreamMeta* pMeta) {