From 031506aef8b2088b5c03902ee7e40c718fd88a3b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 11 Oct 2023 19:46:35 +0800 Subject: [PATCH] refactor backend --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/tq/tqStreamStateSnap.c | 5 +++-- source/libs/stream/src/streamMeta.c | 16 ++++++++++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a9e593eaad..859d7cea54 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -725,6 +725,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); +int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 38cd1e1b76..b2835f2198 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -169,14 +169,15 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); - // int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); int32_t code = streamStateLoadTasks(pWriter); tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); taosMemoryFree(pWriter); return code; } -int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); } +int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { + return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta); +} int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 72399865e6..83fe0b03c0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -756,6 +756,22 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { taosArrayDestroy(pRecycleList); } +int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) { + if (pMeta == NULL) return 0; + + void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); + while (pIter) { + STaskBackendWrapper* taskBackend = *(STaskBackendWrapper**)pIter; + if (taskBackend != NULL) { + taskBackendRemoveRef(taskBackend); + } + pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); + } + taosHashClear(pMeta->pTaskBackendUnique); + + + return 0; +} int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL;