add checkpoint

This commit is contained in:
yihaoDeng 2023-06-27 07:46:30 +00:00
parent 2c1fc501ff
commit a824db4fd5
4 changed files with 38 additions and 34 deletions

View File

@ -70,6 +70,9 @@ target_sources(
"src/tq/tqRestore.c"
"src/tq/tqSnapshot.c"
"src/tq/tqOffsetSnapshot.c"
"src/tq/tqStreamStateSnap.c"
"src/tq/tqStreamTaskSnap.c"
)
IF (TD_VNODE_PLUGINS)

View File

@ -471,6 +471,7 @@ enum {
SNAP_DATA_TQ_OFFSET = 8,
SNAP_DATA_STREAM_TASK = 9,
SNAP_DATA_STREAM_STATE = 10,
SNAP_DATA_STREAM_STATE_BACKEND = 11,
};
struct SSnapDataHdr {

View File

@ -19,19 +19,19 @@
#include "tq.h"
// STqSnapReader ========================================
struct STqSnapReader {
struct SStreamStateReader {
STQ* pTq;
int64_t sver;
int64_t ever;
TBC* pCur;
};
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) {
int32_t code = 0;
STqSnapReader* pReader = NULL;
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) {
int32_t code = 0;
SStreamStateReader* pReader = NULL;
// alloc
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
pReader = (SStreamStateReader*)taosMemoryCalloc(1, sizeof(SStreamStateReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -64,7 +64,7 @@ _err:
return code;
}
int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
int32_t streamStatSnapReaderClose(SStreamStateReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pCur);
@ -74,7 +74,7 @@ int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
return code;
}
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL;
const void* pVal = NULL;
@ -124,19 +124,19 @@ _err:
}
// STqSnapWriter ========================================
struct STqSnapWriter {
struct SStreamStateWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN* txn;
};
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
int32_t code = 0;
STqSnapWriter* pWriter;
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter) {
int32_t code = 0;
SStreamStateWriter* pWriter;
// alloc
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -160,10 +160,10 @@ _err:
return code;
}
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
STqSnapWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
int32_t streamStateSnapWriterClose(SStreamStateWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
SStreamStateWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
if (rollback) {
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
@ -189,7 +189,7 @@ _err:
return code;
}
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
SDecoder decoder = {0};

View File

@ -18,19 +18,19 @@
#include "tq.h"
// STqSnapReader ========================================
struct STqSnapReader {
struct SStreamTaskReader {
STQ* pTq;
int64_t sver;
int64_t ever;
TBC* pCur;
};
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) {
int32_t code = 0;
STqSnapReader* pReader = NULL;
int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader) {
int32_t code = 0;
SStreamTaskReader* pReader = NULL;
// alloc
pReader = (STqSnapReader*)taosMemoryCalloc(1, sizeof(STqSnapReader));
pReader = (SStreamTaskReader*)taosMemoryCalloc(1, sizeof(SStreamTaskReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -63,7 +63,7 @@ _err:
return code;
}
int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
int32_t streamTaskSnapReaderClose(SStreamTaskReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pCur);
@ -73,7 +73,7 @@ int32_t tqSnapReaderClose(STqSnapReader** ppReader) {
return code;
}
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL;
const void* pVal = NULL;
@ -123,19 +123,19 @@ _err:
}
// STqSnapWriter ========================================
struct STqSnapWriter {
struct SStreamTaskWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN* txn;
};
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) {
int32_t code = 0;
STqSnapWriter* pWriter;
int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) {
int32_t code = 0;
SStreamTaskWriter* pWriter;
// alloc
pWriter = (STqSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
pWriter = (SStreamTaskWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -159,10 +159,10 @@ _err:
return code;
}
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
STqSnapWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
SStreamTaskWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
if (rollback) {
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
@ -188,7 +188,7 @@ _err:
return code;
}
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
SDecoder decoder = {0};