add checkpoint

This commit is contained in:
yihaoDeng 2023-06-28 07:02:20 +00:00
parent f6d5d0c8ca
commit 871f66565d
4 changed files with 78 additions and 7 deletions

View File

@ -307,6 +307,22 @@ int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter
int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback);
int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData);
// SStreamTaskWriter ======================================
int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader);
int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader);
int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData);
int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter);
int32_t streamTaskSnapWriterClose(SStreamTaskWriter* ppWriter, int8_t rollback);
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader);
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader);
int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData);
int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateWriter** ppWriter);
int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback);
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData);
// SStreamTaskReader ======================================
// SStreamStateWriter =====================================
// SStreamStateReader =====================================

View File

@ -58,7 +58,7 @@ _err:
return code;
}
int32_t streamStatSnapReaderClose(SStreamStateReader* pReader) {
int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) {
int32_t code = 0;
streamSnapReaderClose(pReader->pReaderImpl);
taosMemoryFree(pReader);

View File

@ -184,9 +184,8 @@ _err:
return 0;
}
int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback) {
int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
int32_t code = 0;
SStreamTaskWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
if (rollback) {
@ -199,7 +198,6 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter** ppWriter, int8_t rollback)
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
// restore from metastore
// if (tqMetaRestoreHandle(pTq) < 0) {

View File

@ -221,8 +221,41 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
// STREAM ============
if (!pReader->streamTaskDone) {
if (pReader->pStreamTaskReader == NULL) {
code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader);
if (code) goto _err;
}
code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->streamTaskDone = 1;
code = streamTaskSnapReaderClose(pReader->pStreamTaskReader);
if (code) goto _err;
}
}
}
if (!pReader->streamStateDone) {
if (pReader->pStreamStateReader == NULL) {
code =
streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader);
if (code) goto _err;
}
code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->streamStateDone = 1;
code = streamStateSnapReaderClose(pReader->pStreamStateReader);
if (code) goto _err;
}
}
}
// RSMA ==============
@ -366,6 +399,16 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (code) goto _exit;
}
if (pWriter->pStreamTaskWriter) {
code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pStreamStateWriter) {
code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pRsmaSnapWriter) {
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
if (code) goto _exit;
@ -469,9 +512,23 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
} break;
case SNAP_DATA_TQ_OFFSET: {
} break;
case SNAP_DATA_STREAM_TASK: {
case SNAP_DATA_STREAM_TASK:
case SNAP_DATA_STREAM_TASK_CHECKPOINT: {
if (pWriter->pStreamTaskWriter == NULL) {
code = streamTaskSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamTaskWriter);
if (code) goto _err;
}
code = streamTaskSnapWrite(pWriter->pStreamTaskWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_STREAM_STATE: {
if (pWriter->pStreamStateWriter == NULL) {
code = streamStateSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pStreamStateWriter);
if (code) goto _err;
}
code = streamStateSnapWrite(pWriter->pStreamStateWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_RSMA1:
case SNAP_DATA_RSMA2: