From 74185b8b9c393c79bc8a26a78268fcca9a20b0ac Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Sat, 7 Oct 2023 16:07:21 +0800 Subject: [PATCH] feat: exchange snapshot info for rsma tsdbs --- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/sma/smaSnapshot.c | 4 +- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 106 ++++++++++++--------- 3 files changed, 66 insertions(+), 46 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6682ff0133..5015202865 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -357,7 +357,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader); int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData); // SRSmaSnapWriter ======================================== -int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges, SRSmaSnapWriter** ppWriter); +int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void** ppRanges, SRSmaSnapWriter** ppWriter); int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback); diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index ca46b4728f..1e921b23d1 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -128,7 +128,7 @@ struct SRSmaSnapWriter { STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2]; }; -int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges, SRSmaSnapWriter** ppWriter) { +int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void** ppRanges, SRSmaSnapWriter** ppWriter) { int32_t code = 0; int32_t lino = 0; SVnode* pVnode = pSma->pVnode; @@ -147,7 +147,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges // rsma1/rsma2 for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { - code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, pRanges, &pWriter->pDataWriter[i]); + code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, ((void**)ppRanges)[i], &pWriter->pDataWriter[i]); TSDB_CHECK_CODE(code, lino, _exit); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 0874e5e0d8..f5a5249d1f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -45,7 +45,7 @@ struct SVSnapReader { SStreamStateReader *pStreamStateReader; // rsma int8_t rsmaDone; - TSnapRangeArray *pRsmaRanges; + TSnapRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; SRSmaSnapReader *pRsmaReader; }; @@ -69,6 +69,20 @@ _out: return code; } +static TSnapRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) { + _Static_assert(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2, "Unexpected array size"); + switch (tsdbTyp) { + case SNAP_DATA_TSDB: + return &pReader->pRanges; + case SNAP_DATA_RSMA1: + return &pReader->pRsmaRanges[0]; + case SNAP_DATA_RSMA2: + return &pReader->pRsmaRanges[1]; + default: + return NULL; + } +} + int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) { int32_t code = 0; int64_t sver = pParam->start; @@ -92,25 +106,18 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader goto _err; } - int32_t offset = 0; TSnapRangeArray **ppRanges = NULL; + int32_t offset = 0; while (offset + sizeof(SSyncTLV) < datHead->len) { SSyncTLV *subField = (void *)(datHead->val + offset); offset += sizeof(SSyncTLV) + subField->len; void *buf = subField->val; int32_t bufLen = subField->len; - switch (subField->typ) { - case SNAP_DATA_TSDB: - ppRanges = &pReader->pRanges; - break; - case SNAP_DATA_RSMA1: - ppRanges = &pReader->pRsmaRanges; - break; - default: - vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), - subField->typ); - goto _err; + ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ); + if (ppRanges == NULL) { + vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); + goto _err; } if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); @@ -129,8 +136,19 @@ _err: return code; } +static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) { + int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2}; + for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) { + TSnapRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]); + if (ppRanges == NULL) continue; + tsdbSnapRangeArrayDestroy(ppRanges); + } +} + void vnodeSnapReaderClose(SVSnapReader *pReader) { vInfo("vgId:%d, close vnode snapshot reader", TD_VID(pReader->pVnode)); + vnodeSnapReaderDestroyTsdbRanges(pReader); + if (pReader->pRsmaReader) { rsmaSnapReaderClose(&pReader->pRsmaReader); } @@ -155,14 +173,6 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) { tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader); } - if (pReader->pRanges) { - tsdbSnapRangeArrayDestroy(&pReader->pRanges); - } - - if (pReader->pRsmaRanges) { - tsdbSnapRangeArrayDestroy(&pReader->pRsmaRanges); - } - taosMemoryFree(pReader); } @@ -442,10 +452,24 @@ struct SVSnapWriter { SStreamTaskWriter *pStreamTaskWriter; SStreamStateWriter *pStreamStateWriter; // rsma - TSnapRangeArray *pRsmaRanges; + TSnapRangeArray *pRsmaRanges[TSDB_RETENTION_L2]; SRSmaSnapWriter *pRsmaSnapWriter; }; +TSnapRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) { + _Static_assert(sizeof(pWriter->pRsmaRanges) / sizeof(pWriter->pRsmaRanges[0]) == 2, "Unexpected array size"); + switch (tsdbTyp) { + case SNAP_DATA_TSDB: + return &pWriter->pRanges; + case SNAP_DATA_RSMA1: + return &pWriter->pRsmaRanges[0]; + case SNAP_DATA_RSMA2: + return &pWriter->pRsmaRanges[1]; + default: + return NULL; + } +} + int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) { int32_t code = 0; SVSnapWriter *pWriter = NULL; @@ -477,25 +501,18 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter goto _err; } - int32_t offset = 0; TSnapRangeArray **ppRanges = NULL; + int32_t offset = 0; while (offset + sizeof(SSyncTLV) < datHead->len) { SSyncTLV *subField = (void *)(datHead->val + offset); offset += sizeof(SSyncTLV) + subField->len; void *buf = subField->val; int32_t bufLen = subField->len; - switch (subField->typ) { - case SNAP_DATA_TSDB: - ppRanges = &pWriter->pRanges; - break; - case SNAP_DATA_RSMA1: - ppRanges = &pWriter->pRsmaRanges; - break; - default: - vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), - subField->typ); - goto _err; + ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ); + if (ppRanges == NULL) { + vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ); + goto _err; } if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); @@ -515,10 +532,21 @@ _err: return code; } +static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) { + int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2}; + for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) { + TSnapRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]); + if (ppRanges == NULL) continue; + tsdbSnapRangeArrayDestroy(ppRanges); + } +} + int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) { int32_t code = 0; SVnode *pVnode = pWriter->pVnode; + vnodeSnapWriterDestroyTsdbRanges(pWriter); + // prepare if (pWriter->pTsdbSnapWriter) { tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter); @@ -550,10 +578,6 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _exit; } - if (pWriter->pRanges) { - tsdbSnapRangeArrayDestroy(&pWriter->pRanges); - } - if (pWriter->pTsdbSnapWriter) { code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback); if (code) goto _exit; @@ -588,10 +612,6 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * if (code) goto _exit; } - if (pWriter->pRsmaRanges) { - tsdbSnapRangeArrayDestroy(&pWriter->pRsmaRanges); - } - if (pWriter->pRsmaSnapWriter) { code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback); if (code) goto _exit; @@ -739,7 +759,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { case SNAP_DATA_QTASK: { // rsma1/rsma2/qtask for rsma if (pWriter->pRsmaSnapWriter == NULL) { - code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, pWriter->pRsmaRanges, + code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges, &pWriter->pRsmaSnapWriter); if (code) goto _err; }