diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7b1f3716c4..e50ced2ebb 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -319,7 +319,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData); // STsdbSnapWriter ======================================== int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** ppWriter); int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr); -int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter); +int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter, bool rollback); int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback); // STsdbSnapRAWReader ======================================== int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader); @@ -373,7 +373,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData); // SRSmaSnapWriter ======================================== int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void** ppRanges, SRSmaSnapWriter** ppWriter); int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); -int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter); +int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter, bool rollback); int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback); typedef struct { diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index bf0b1f0b9d..881c8ac96d 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -163,11 +163,11 @@ _exit: TAOS_RETURN(code); } -int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) { +int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter, bool rollback) { int32_t code = 0; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pWriter->pDataWriter[i]) { - code = tsdbSnapWriterPrepareClose(pWriter->pDataWriter[i]); + code = tsdbSnapWriterPrepareClose(pWriter->pDataWriter[i], rollback); if (code) { smaError("vgId:%d, failed to prepare close tsdbSnapWriter since %s. i: %d", SMA_VID(pWriter->pSma), tstrerror(code), i); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 15930353bf..7d4bcb6914 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -924,6 +924,31 @@ _exit: return code; } +static int32_t tsdbSnapWriteFileSetAbort(STsdbSnapWriter* writer) { + if (!writer->ctx->fsetWriteBegin) return 0; + + int32_t code = 0; + int32_t lino = 0; + + // close write + code = tsdbSnapWriteFileSetCloseWriter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapWriteFileSetCloseIter(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbSnapWriteFileSetCloseReader(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + writer->ctx->fsetWriteBegin = false; + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) { int32_t code = 0; int32_t lino = 0; @@ -1075,15 +1100,23 @@ _exit: return code; } -int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer) { +int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer, bool rollback) { int32_t code = 0; int32_t lino = 0; - code = tsdbSnapWriteFileSetEnd(writer); - TSDB_CHECK_CODE(code, lino, _exit); + if (!rollback) { + code = tsdbSnapWriteFileSetEnd(writer); + TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT); - TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbSnapWriteFileSetAbort(writer); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT); + TSDB_CHECK_CODE(code, lino, _exit); + } _exit: if (code) { diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 202c6dd312..4b7cd38913 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -661,7 +661,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * // prepare if (pWriter->pTsdbSnapWriter) { - (void)tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter); + (void)tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter, rollback); } if (pWriter->pTsdbSnapRAWWriter) { @@ -669,7 +669,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * } if (pWriter->pRsmaSnapWriter) { - (void)rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter); + (void)rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter, rollback); } // commit json diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8081de60c9..4dce54fc1a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -146,6 +146,10 @@ void syncStop(int64_t rid) { void syncPreStop(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode != NULL) { + if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { + sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId); + snapshotReceiverStop(pSyncNode->pNewNodeReceiver); + } syncNodePreClose(pSyncNode); syncNodeRelease(pSyncNode); }