From b6c991f896129ff512212f18b5f8c14b6ffd7d41 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 08:59:04 +0000 Subject: [PATCH] support reopen stream state --- include/libs/stream/tstream.h | 4 ++-- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 6 +++++- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 3 --- source/libs/stream/src/streamMeta.c | 16 ++++++++-------- 4 files changed, 15 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9c01b40dce..4c0005ece7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -611,8 +611,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); -int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); -int32_t streamStateReopen(SStreamMeta *pMeta, int64_t chkpId); +// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); +int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index f1bdc0d6de..87d174715d 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -163,7 +163,11 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) return code; } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) { - return streamStateReopen(pWriter->pTq->pStreamMeta, chkpId); + int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); + if (code == 0) { + code = streamStateLoadTasks(pWriter); + } + return code; } int32_t streamStateLoadTasksImpl(SStreamMeta* pMeta, int64_t ver) { diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index be5fb4785c..00f7150c0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -412,9 +412,6 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, NULL, 0); if (code) goto _exit; - - code = streamStateLoadTasks(pWriter->pStreamStateWriter); - if (code) goto _exit; } if (pWriter->pRsmaSnapWriter) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e93499ab89..5d0861624e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -36,14 +36,14 @@ void streamMetaCleanup() { taosCloseRef(streamBackendCfWrapperId); } -int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) { - int32_t code = 0; +// int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) { +// int32_t code = 0; - int32_t nTask = taosHashGetSize(pMeta->pTasks); - assert(nTask == 0); +// int32_t nTask = taosHashGetSize(pMeta->pTasks); +// assert(nTask == 0); - return code; -} +// return code; +// } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { int32_t code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); @@ -128,7 +128,7 @@ _err: return NULL; } -void streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { +int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { // stop all running tasking and reopen later void* pIter = NULL; while (1) { @@ -172,7 +172,7 @@ void streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { // if (streamLoadTasks(pMeta,int64_t ver)) - return; + return 0; } void streamMetaClose(SStreamMeta* pMeta) { tdbAbort(pMeta->db, pMeta->txn);