feat: exchange snapshot info for rsma tsdbs
This commit is contained in:
parent
d5d713b1b4
commit
74185b8b9c
|
@ -357,7 +357,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
|
|||
int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader);
|
||||
int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
|
||||
// SRSmaSnapWriter ========================================
|
||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges, SRSmaSnapWriter** ppWriter);
|
||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void** ppRanges, SRSmaSnapWriter** ppWriter);
|
||||
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
|
||||
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
|
||||
|
||||
|
|
|
@ -128,7 +128,7 @@ struct SRSmaSnapWriter {
|
|||
STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2];
|
||||
};
|
||||
|
||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges, SRSmaSnapWriter** ppWriter) {
|
||||
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void** ppRanges, SRSmaSnapWriter** ppWriter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SVnode* pVnode = pSma->pVnode;
|
||||
|
@ -147,7 +147,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, void* pRanges
|
|||
// rsma1/rsma2
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pSma->pRSmaTsdb[i]) {
|
||||
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, pRanges, &pWriter->pDataWriter[i]);
|
||||
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, ((void**)ppRanges)[i], &pWriter->pDataWriter[i]);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ struct SVSnapReader {
|
|||
SStreamStateReader *pStreamStateReader;
|
||||
// rsma
|
||||
int8_t rsmaDone;
|
||||
TSnapRangeArray *pRsmaRanges;
|
||||
TSnapRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
|
||||
SRSmaSnapReader *pRsmaReader;
|
||||
};
|
||||
|
||||
|
@ -69,6 +69,20 @@ _out:
|
|||
return code;
|
||||
}
|
||||
|
||||
static TSnapRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int32_t tsdbTyp) {
|
||||
_Static_assert(sizeof(pReader->pRsmaRanges) / sizeof(pReader->pRsmaRanges[0]) == 2, "Unexpected array size");
|
||||
switch (tsdbTyp) {
|
||||
case SNAP_DATA_TSDB:
|
||||
return &pReader->pRanges;
|
||||
case SNAP_DATA_RSMA1:
|
||||
return &pReader->pRsmaRanges[0];
|
||||
case SNAP_DATA_RSMA2:
|
||||
return &pReader->pRsmaRanges[1];
|
||||
default:
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
|
||||
int32_t code = 0;
|
||||
int64_t sver = pParam->start;
|
||||
|
@ -92,25 +106,18 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader
|
|||
goto _err;
|
||||
}
|
||||
|
||||
int32_t offset = 0;
|
||||
TSnapRangeArray **ppRanges = NULL;
|
||||
int32_t offset = 0;
|
||||
|
||||
while (offset + sizeof(SSyncTLV) < datHead->len) {
|
||||
SSyncTLV *subField = (void *)(datHead->val + offset);
|
||||
offset += sizeof(SSyncTLV) + subField->len;
|
||||
void *buf = subField->val;
|
||||
int32_t bufLen = subField->len;
|
||||
switch (subField->typ) {
|
||||
case SNAP_DATA_TSDB:
|
||||
ppRanges = &pReader->pRanges;
|
||||
break;
|
||||
case SNAP_DATA_RSMA1:
|
||||
ppRanges = &pReader->pRsmaRanges;
|
||||
break;
|
||||
default:
|
||||
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode),
|
||||
subField->typ);
|
||||
goto _err;
|
||||
ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ);
|
||||
if (ppRanges == NULL) {
|
||||
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
|
||||
goto _err;
|
||||
}
|
||||
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) {
|
||||
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
|
||||
|
@ -129,8 +136,19 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void vnodeSnapReaderDestroyTsdbRanges(SVSnapReader *pReader) {
|
||||
int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
|
||||
for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
|
||||
TSnapRangeArray **ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, tsdbTyps[j]);
|
||||
if (ppRanges == NULL) continue;
|
||||
tsdbSnapRangeArrayDestroy(ppRanges);
|
||||
}
|
||||
}
|
||||
|
||||
void vnodeSnapReaderClose(SVSnapReader *pReader) {
|
||||
vInfo("vgId:%d, close vnode snapshot reader", TD_VID(pReader->pVnode));
|
||||
vnodeSnapReaderDestroyTsdbRanges(pReader);
|
||||
|
||||
if (pReader->pRsmaReader) {
|
||||
rsmaSnapReaderClose(&pReader->pRsmaReader);
|
||||
}
|
||||
|
@ -155,14 +173,6 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
|
|||
tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
|
||||
}
|
||||
|
||||
if (pReader->pRanges) {
|
||||
tsdbSnapRangeArrayDestroy(&pReader->pRanges);
|
||||
}
|
||||
|
||||
if (pReader->pRsmaRanges) {
|
||||
tsdbSnapRangeArrayDestroy(&pReader->pRsmaRanges);
|
||||
}
|
||||
|
||||
taosMemoryFree(pReader);
|
||||
}
|
||||
|
||||
|
@ -442,10 +452,24 @@ struct SVSnapWriter {
|
|||
SStreamTaskWriter *pStreamTaskWriter;
|
||||
SStreamStateWriter *pStreamStateWriter;
|
||||
// rsma
|
||||
TSnapRangeArray *pRsmaRanges;
|
||||
TSnapRangeArray *pRsmaRanges[TSDB_RETENTION_L2];
|
||||
SRSmaSnapWriter *pRsmaSnapWriter;
|
||||
};
|
||||
|
||||
TSnapRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t tsdbTyp) {
|
||||
_Static_assert(sizeof(pWriter->pRsmaRanges) / sizeof(pWriter->pRsmaRanges[0]) == 2, "Unexpected array size");
|
||||
switch (tsdbTyp) {
|
||||
case SNAP_DATA_TSDB:
|
||||
return &pWriter->pRanges;
|
||||
case SNAP_DATA_RSMA1:
|
||||
return &pWriter->pRsmaRanges[0];
|
||||
case SNAP_DATA_RSMA2:
|
||||
return &pWriter->pRsmaRanges[1];
|
||||
default:
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
|
||||
int32_t code = 0;
|
||||
SVSnapWriter *pWriter = NULL;
|
||||
|
@ -477,25 +501,18 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter
|
|||
goto _err;
|
||||
}
|
||||
|
||||
int32_t offset = 0;
|
||||
TSnapRangeArray **ppRanges = NULL;
|
||||
int32_t offset = 0;
|
||||
|
||||
while (offset + sizeof(SSyncTLV) < datHead->len) {
|
||||
SSyncTLV *subField = (void *)(datHead->val + offset);
|
||||
offset += sizeof(SSyncTLV) + subField->len;
|
||||
void *buf = subField->val;
|
||||
int32_t bufLen = subField->len;
|
||||
switch (subField->typ) {
|
||||
case SNAP_DATA_TSDB:
|
||||
ppRanges = &pWriter->pRanges;
|
||||
break;
|
||||
case SNAP_DATA_RSMA1:
|
||||
ppRanges = &pWriter->pRsmaRanges;
|
||||
break;
|
||||
default:
|
||||
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode),
|
||||
subField->typ);
|
||||
goto _err;
|
||||
ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ);
|
||||
if (ppRanges == NULL) {
|
||||
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
|
||||
goto _err;
|
||||
}
|
||||
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) {
|
||||
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
|
||||
|
@ -515,10 +532,21 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void vnodeSnapWriterDestroyTsdbRanges(SVSnapWriter *pWriter) {
|
||||
int32_t tsdbTyps[TSDB_RETENTION_MAX] = {SNAP_DATA_TSDB, SNAP_DATA_RSMA1, SNAP_DATA_RSMA2};
|
||||
for (int32_t j = 0; j < TSDB_RETENTION_MAX; ++j) {
|
||||
TSnapRangeArray **ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, tsdbTyps[j]);
|
||||
if (ppRanges == NULL) continue;
|
||||
tsdbSnapRangeArrayDestroy(ppRanges);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) {
|
||||
int32_t code = 0;
|
||||
SVnode *pVnode = pWriter->pVnode;
|
||||
|
||||
vnodeSnapWriterDestroyTsdbRanges(pWriter);
|
||||
|
||||
// prepare
|
||||
if (pWriter->pTsdbSnapWriter) {
|
||||
tsdbSnapWriterPrepareClose(pWriter->pTsdbSnapWriter);
|
||||
|
@ -550,10 +578,6 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
|||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
if (pWriter->pRanges) {
|
||||
tsdbSnapRangeArrayDestroy(&pWriter->pRanges);
|
||||
}
|
||||
|
||||
if (pWriter->pTsdbSnapWriter) {
|
||||
code = tsdbSnapWriterClose(&pWriter->pTsdbSnapWriter, rollback);
|
||||
if (code) goto _exit;
|
||||
|
@ -588,10 +612,6 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
|||
if (code) goto _exit;
|
||||
}
|
||||
|
||||
if (pWriter->pRsmaRanges) {
|
||||
tsdbSnapRangeArrayDestroy(&pWriter->pRsmaRanges);
|
||||
}
|
||||
|
||||
if (pWriter->pRsmaSnapWriter) {
|
||||
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
|
||||
if (code) goto _exit;
|
||||
|
@ -739,7 +759,7 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
|
|||
case SNAP_DATA_QTASK: {
|
||||
// rsma1/rsma2/qtask for rsma
|
||||
if (pWriter->pRsmaSnapWriter == NULL) {
|
||||
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, pWriter->pRsmaRanges,
|
||||
code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, (void **)pWriter->pRsmaRanges,
|
||||
&pWriter->pRsmaSnapWriter);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue