From eeb97351e8712c1156ccf78814f9d36f08940fcc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 7 Aug 2023 08:51:01 +0000 Subject: [PATCH] support reopen stream state --- include/libs/stream/tstream.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 3 + source/dnode/vnode/src/tq/tqStreamStateSnap.c | 11 ++- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 6 ++ source/libs/stream/src/streamBackendRocksdb.c | 8 +-- source/libs/stream/src/streamMeta.c | 69 ++++++++++--------- 6 files changed, 60 insertions(+), 38 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 51a31f72ed..9c01b40dce 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -612,6 +612,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); +int32_t streamStateReopen(SStreamMeta *pMeta, int64_t chkpId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index cb7af681ee..18f8872e03 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -331,6 +331,9 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId); +int32_t streamStateLoadTasks(SStreamStateWriter* pWriter); + + // SStreamTaskReader ====================================== // SStreamStateWriter ===================================== // SStreamStateReader ===================================== diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 3478928c4f..f1bdc0d6de 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -163,7 +163,16 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) return code; } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) { - return streamStateRebuild(pWriter->pTq->pStreamMeta, path, chkpId); + return streamStateReopen(pWriter->pTq->pStreamMeta, chkpId); +} + +int32_t streamStateLoadTasksImpl(SStreamMeta* pMeta, int64_t ver) { + // impl later + return streamLoadTasks(pMeta, ver); +} +int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { + SWal* pWal = pWriter->pTq->pVnode->pWal; + return streamStateLoadTasksImpl(pWriter->pTq->pStreamMeta, walGetCommittedVer(pWal)); } int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 16bd233807..be5fb4785c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -409,6 +409,12 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (pWriter->pStreamStateWriter) { code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback); if (code) goto _exit; + + code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, NULL, 0); + if (code) goto _exit; + + code = streamStateLoadTasks(pWriter->pStreamStateWriter); + if (code) goto _exit; } if (pWriter->pRsmaSnapWriter) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ba12c47bc0..c63942c8cc 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -732,8 +732,8 @@ int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); } -int stateSessionKeyEncode(void* ses, char* buf) { - SStateSessionKey* sess = ses; +int stateSessionKeyEncode(void* k, char* buf) { + SStateSessionKey* sess = k; int len = 0; len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey); len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey); @@ -741,8 +741,8 @@ int stateSessionKeyEncode(void* ses, char* buf) { len += taosEncodeFixedI64((void**)&buf, sess->opNum); return len; } -int stateSessionKeyDecode(void* ses, char* buf) { - SStateSessionKey* sess = ses; +int stateSessionKeyDecode(void* k, char* buf) { + SStateSessionKey* sess = k; int len = 0; char* p = buf; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 3cf967a219..e93499ab89 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -128,48 +128,51 @@ _err: return NULL; } -void streamMetaReopen(SStreamMeta** ppMeta) { - SStreamMeta* pMeta = *ppMeta; +void streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { + // stop all running tasking and reopen later + void* pIter = NULL; + while (1) { + pIter = taosHashIterate(pMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } - SStreamMeta* pNewMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); - pNewMeta->path = taosStrdup(pMeta->path); - pNewMeta->vgId = pMeta->vgId; - pNewMeta->walScanCounter = 0; - pNewMeta->ahandle = pMeta->ahandle; - pNewMeta->expandFunc = pMeta->expandFunc; + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->schedTimer) { + taosTmrStop(pTask->schedTimer); + pTask->schedTimer = NULL; + } - *ppMeta = pNewMeta; + if (pTask->launchTaskTimer) { + taosTmrStop(pTask->launchTaskTimer); + pTask->launchTaskTimer = NULL; + } - streamMetaClose(pMeta); + tFreeStreamTask(pTask); + } - // tdbAbort(pMeta->db, pMeta->txn); - // tdbTbClose(pMeta->pTaskDb); - // tdbTbClose(pMeta->pCheckpointDb); - // tdbClose(pMeta->db); + // close stream backend + streamBackendCleanup(pMeta->streamBackend); + taosRemoveRef(streamBackendId, pMeta->streamBackendRid); + pMeta->streamBackendRid = -1; + pMeta->streamBackend = NULL; - // void* pIter = NULL; - // while (1) { - // pIter = taosHashIterate(pMeta->pTasks, pIter); - // if (pIter == NULL) { - // break; - // } + pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); + pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); - // SStreamTask* pTask = *(SStreamTask**)pIter; - // if (pTask->schedTimer) { - // taosTmrStop(pTask->schedTimer); - // pTask->schedTimer = NULL; - // } + taosHashClear(pMeta->pTasks); - // if (pTask->launchTaskTimer) { - // taosTmrStop(pTask->launchTaskTimer); - // pTask->launchTaskTimer = NULL; - // } + taosArrayClear(pMeta->pTaskList); - // tFreeStreamTask(pTask); - // } + taosHashClear(pMeta->pTaskBackendUnique); - // taosHashClear(pMeta->pTasks); - // taosRemoveRef(streamBackendId, pMeta->streamBackendRid); + taosArrayClear(pMeta->checkpointSaved); + + taosArrayClear(pMeta->checkpointInUse); + + // if (streamLoadTasks(pMeta,int64_t ver)) + + return; } void streamMetaClose(SStreamMeta* pMeta) { tdbAbort(pMeta->db, pMeta->txn);