From a824db4fd5755f117f57d117c024e3c036fd6a8f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 27 Jun 2023 07:46:30 +0000 Subject: [PATCH] add checkpoint --- source/dnode/vnode/CMakeLists.txt | 3 ++ source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tqStreamStateSnap.c | 34 +++++++++---------- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 34 +++++++++---------- 4 files changed, 38 insertions(+), 34 deletions(-) diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 9f828d00f5..ab2e83606c 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -70,6 +70,9 @@ target_sources( "src/tq/tqRestore.c" "src/tq/tqSnapshot.c" "src/tq/tqOffsetSnapshot.c" + "src/tq/tqStreamStateSnap.c" + "src/tq/tqStreamTaskSnap.c" + ) IF (TD_VNODE_PLUGINS) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d6e9f64827..22a7fdbef5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -471,6 +471,7 @@ enum { SNAP_DATA_TQ_OFFSET = 8, SNAP_DATA_STREAM_TASK = 9, SNAP_DATA_STREAM_STATE = 10, + SNAP_DATA_STREAM_STATE_BACKEND = 11, }; struct SSnapDataHdr { diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index e150f59aec..6508eca3f5 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -19,19 +19,19 @@ #include "tq.h" // STqSnapReader ======================================== -struct STqSnapReader { +struct SStreamStateReader { STQ* pTq; int64_t sver; int64_t ever; TBC* pCur; }; -int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) { - int32_t code = 0; - STqSnapReader* pReader = NULL; +int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) { + int32_t code = 0; + SStreamStateReader* pReader = NULL; // alloc - pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader)); + pReader = (SStreamStateReader*)taosMemoryCalloc(1, sizeof(SStreamStateReader)); if (pReader == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -64,7 +64,7 @@ _err: return code; } -int32_t tqSnapReaderClose(STqSnapReader** ppReader) { +int32_t streamStatSnapReaderClose(SStreamStateReader** ppReader) { int32_t code = 0; tdbTbcClose((*ppReader)->pCur); @@ -74,7 +74,7 @@ int32_t tqSnapReaderClose(STqSnapReader** ppReader) { return code; } -int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { +int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { int32_t code = 0; const void* pKey = NULL; const void* pVal = NULL; @@ -124,19 +124,19 @@ _err: } // STqSnapWriter ======================================== -struct STqSnapWriter { +struct SStreamStateWriter { STQ* pTq; int64_t sver; int64_t ever; TXN* txn; }; -int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { - int32_t code = 0; - STqSnapWriter* pWriter; +int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter) { + int32_t code = 0; + SStreamStateWriter* pWriter; // alloc - pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -160,10 +160,10 @@ _err: return code; } -int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { - int32_t code = 0; - STqSnapWriter* pWriter = *ppWriter; - STQ* pTq = pWriter->pTq; +int32_t streamStateSnapWriterClose(SStreamStateWriter** ppWriter, int8_t rollback) { + int32_t code = 0; + SStreamStateWriter* pWriter = *ppWriter; + STQ* pTq = pWriter->pTq; if (rollback) { tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); @@ -189,7 +189,7 @@ _err: return code; } -int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { +int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STQ* pTq = pWriter->pTq; SDecoder decoder = {0}; diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index ab7093a701..8b2d59a70d 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -18,19 +18,19 @@ #include "tq.h" // STqSnapReader ======================================== -struct STqSnapReader { +struct SStreamTaskReader { STQ* pTq; int64_t sver; int64_t ever; TBC* pCur; }; -int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) { - int32_t code = 0; - STqSnapReader* pReader = NULL; +int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) { + int32_t code = 0; + SStreamTaskReader* pReader = NULL; // alloc - pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader)); + pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader)); if (pReader == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -63,7 +63,7 @@ _err: return code; } -int32_t tqSnapReaderClose(STqSnapReader** ppReader) { +int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) { int32_t code = 0; tdbTbcClose((*ppReader)->pCur); @@ -73,7 +73,7 @@ int32_t tqSnapReaderClose(STqSnapReader** ppReader) { return code; } -int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { +int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) { int32_t code = 0; const void* pKey = NULL; const void* pVal = NULL; @@ -123,19 +123,19 @@ _err: } // STqSnapWriter ======================================== -struct STqSnapWriter { +struct SStreamTaskWriter { STQ* pTq; int64_t sver; int64_t ever; TXN* txn; }; -int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { - int32_t code = 0; - STqSnapWriter* pWriter; +int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) { + int32_t code = 0; + SStreamTaskWriter* pWriter; // alloc - pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); + pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -159,10 +159,10 @@ _err: return code; } -int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { - int32_t code = 0; - STqSnapWriter* pWriter = *ppWriter; - STQ* pTq = pWriter->pTq; +int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) { + int32_t code = 0; + SStreamTaskWriter* pWriter = *ppWriter; + STQ* pTq = pWriter->pTq; if (rollback) { tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); @@ -188,7 +188,7 @@ _err: return code; } -int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { +int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STQ* pTq = pWriter->pTq; SDecoder decoder = {0};