From 871f66565d0a90addc38ca36ed2ba4436a6a3cbf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 28 Jun 2023 07:02:20 +0000 Subject: [PATCH] add checkpoint --- source/dnode/vnode/src/inc/vnodeInt.h | 16 +++++ source/dnode/vnode/src/tq/tqStreamStateSnap.c | 2 +- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 8 +-- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 59 ++++++++++++++++++- 4 files changed, 78 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 4f89889b0b..5b3ee1a588 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -307,6 +307,22 @@ int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback); int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData); // SStreamTaskWriter ====================================== + +int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader); +int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader); +int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData); + +int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter); +int32_t streamTaskSnapWriterClose(SStreamTaskWriter* ppWriter, int8_t rollback); +int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData); + +int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader); +int32_t streamStateSnapReaderClose(SStreamStateReader* pReader); +int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData); + +int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter); +int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback); +int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData); // SStreamTaskReader ====================================== // SStreamStateWriter ===================================== // SStreamStateReader ===================================== diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 8f70f03406..e579231d14 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -58,7 +58,7 @@ _err: return code; } -int32_t streamStatSnapReaderClose(SStreamStateReader* pReader) { +int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) { int32_t code = 0; streamSnapReaderClose(pReader->pReaderImpl); taosMemoryFree(pReader); diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 591b21128b..3232a5152f 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -184,10 +184,9 @@ _err: return 0; } -int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) { - int32_t code = 0; - SStreamTaskWriter* pWriter = *ppWriter; - STQ* pTq = pWriter->pTq; +int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { + int32_t code = 0; + STQ* pTq = pWriter->pTq; if (rollback) { tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn); @@ -199,7 +198,6 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) } taosMemoryFree(pWriter); - *ppWriter = NULL; // restore from metastore // if (tqMetaRestoreHandle(pTq) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 052e4ab2c1..c6638e2959 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -221,8 +221,41 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) // STREAM ============ if (!pReader->streamTaskDone) { + if (pReader->pStreamTaskReader == NULL) { + code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader); + if (code) goto _err; + } + code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->streamTaskDone = 1; + code = streamTaskSnapReaderClose(pReader->pStreamTaskReader); + if (code) goto _err; + } + } } if (!pReader->streamStateDone) { + if (pReader->pStreamStateReader == NULL) { + code = + streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader); + if (code) goto _err; + } + code = streamStateSnapRead(pReader->pStreamStateReader, ppData); + if (code) { + goto _err; + } else { + if (*ppData) { + goto _exit; + } else { + pReader->streamStateDone = 1; + code = streamStateSnapReaderClose(pReader->pStreamStateReader); + if (code) goto _err; + } + } } // RSMA ============== @@ -366,6 +399,16 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _exit; } + if (pWriter->pStreamTaskWriter) { + code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback); + if (code) goto _exit; + } + + if (pWriter->pStreamStateWriter) { + code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback); + if (code) goto _exit; + } + if (pWriter->pRsmaSnapWriter) { code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback); if (code) goto _exit; @@ -469,9 +512,23 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { } break; case SNAP_DATA_TQ_OFFSET: { } break; - case SNAP_DATA_STREAM_TASK: { + case SNAP_DATA_STREAM_TASK: + case SNAP_DATA_STREAM_TASK_CHECKPOINT: { + if (pWriter->pStreamTaskWriter == NULL) { + code = streamTaskSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamTaskWriter); + if (code) goto _err; + } + code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData); + if (code) goto _err; } break; case SNAP_DATA_STREAM_STATE: { + if (pWriter->pStreamStateWriter == NULL) { + code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter); + if (code) goto _err; + } + code = streamStateSnapWrite(pWriter->pStreamStateWriter, pData, nData); + if (code) goto _err; + } break; case SNAP_DATA_RSMA1: case SNAP_DATA_RSMA2: