Merge pull request #27939 from taosdata/fix/TD-32105-3.0
fix: quick exit when there is snapshot task in progress
This commit is contained in:
commit
486e6679ca
|
@ -319,7 +319,7 @@ int32_t tsdbSnapRead(STsdbSnapReader* pReader, uint8_t** ppData);
|
|||
// STsdbSnapWriter ========================================
|
||||
int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, void* pRanges, STsdbSnapWriter** ppWriter);
|
||||
int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, SSnapDataHdr* pHdr);
|
||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter);
|
||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter, bool rollback);
|
||||
int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback);
|
||||
// STsdbSnapRAWReader ========================================
|
||||
int32_t tsdbSnapRAWReaderOpen(STsdb* pTsdb, int64_t ever, int8_t type, STsdbSnapRAWReader** ppReader);
|
||||
|
@ -373,7 +373,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
|
|||
// SRSmaSnapWriter ========================================
|
||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void** ppRanges, SRSmaSnapWriter** ppWriter);
|
||||
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter);
|
||||
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter, bool rollback);
|
||||
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -163,11 +163,11 @@ _exit:
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) {
|
||||
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter, bool rollback) {
|
||||
int32_t code = 0;
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pWriter->pDataWriter[i]) {
|
||||
code = tsdbSnapWriterPrepareClose(pWriter->pDataWriter[i]);
|
||||
code = tsdbSnapWriterPrepareClose(pWriter->pDataWriter[i], rollback);
|
||||
if (code) {
|
||||
smaError("vgId:%d, failed to prepare close tsdbSnapWriter since %s. i: %d", SMA_VID(pWriter->pSma),
|
||||
tstrerror(code), i);
|
||||
|
|
|
@ -924,6 +924,31 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteFileSetAbort(STsdbSnapWriter* writer) {
|
||||
if (!writer->ctx->fsetWriteBegin) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// close write
|
||||
code = tsdbSnapWriteFileSetCloseWriter(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSnapWriteFileSetCloseIter(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbSnapWriteFileSetCloseReader(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
writer->ctx->fsetWriteBegin = false;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TSDB_ERROR_LOG(TD_VID(writer->tsdb->pVnode), lino, code);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr* hdr) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
@ -1075,15 +1100,23 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer) {
|
||||
int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* writer, bool rollback) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
code = tsdbSnapWriteFileSetEnd(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (!rollback) {
|
||||
code = tsdbSnapWriteFileSetEnd(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbSnapWriteFileSetAbort(writer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFSEditBegin(writer->tsdb->pFS, writer->fopArr, TSDB_FEDIT_COMMIT);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
|
|
@ -661,7 +661,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
|||
|
||||
// prepare
|
||||
if (pWriter->pTsdbSnapWriter) {
|
||||
(void)tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter);
|
||||
(void)tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter, rollback);
|
||||
}
|
||||
|
||||
if (pWriter->pTsdbSnapRAWWriter) {
|
||||
|
@ -669,7 +669,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
|||
}
|
||||
|
||||
if (pWriter->pRsmaSnapWriter) {
|
||||
(void)rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter);
|
||||
(void)rsmaSnapWriterPrepareClose(pWriter->pRsmaSnapWriter, rollback);
|
||||
}
|
||||
|
||||
// commit json
|
||||
|
|
|
@ -146,6 +146,10 @@ void syncStop(int64_t rid) {
|
|||
void syncPreStop(int64_t rid) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode != NULL) {
|
||||
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
||||
sInfo("vgId:%d, stop snapshot receiver", pSyncNode->vgId);
|
||||
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
|
||||
}
|
||||
syncNodePreClose(pSyncNode);
|
||||
syncNodeRelease(pSyncNode);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue