enh: extract snapshot info for both of snap reader and writer

This commit is contained in:
Benguang Zhao 2023-09-15 19:58:48 +08:00
parent 410ced8320
commit e1c03118ab
5 changed files with 108 additions and 35 deletions

View File

@ -151,12 +151,12 @@ typedef struct SReConfigCbMeta {
typedef struct SSnapshotParam {
SyncIndex start;
SyncIndex end;
void* data; // with SMsgHead
SSyncTLV* data;
} SSnapshotParam;
typedef struct SSnapshot {
ESyncSnapInfoTyp typ;
void* data; // with SMsgHead
int32_t typ;
SSyncTLV* data;
SyncIndex lastApplyIndex;
SyncTerm lastApplyTerm;
SyncIndex lastConfigIndex;

View File

@ -263,7 +263,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader
void vnodeSnapReaderClose(SVSnapReader *pReader);
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData);
// SVSnapWriter
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter);
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter);
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);

View File

@ -49,6 +49,26 @@ struct SVSnapReader {
SRSmaSnapReader *pRsmaReader;
};
static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TSnapRangeArray **ppRanges) {
int32_t code = -1;
STsdbSnapPartList *pList = tsdbSnapPartListCreate();
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _out;
}
if (tDeserializeTsdbSnapPartList(buf, bufLen, pList) < 0) {
terrno = TSDB_CODE_INVALID_DATA_FMT;
goto _out;
}
if (tsdbSnapPartListToRangeDiff(pList, ppRanges) < 0) {
goto _out;
}
code = 0;
_out:
tsdbSnapPartListDestroy(&pList);
return code;
}
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
int32_t code = 0;
int64_t sver = pParam->start;
@ -64,6 +84,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader
pReader->sver = sver;
pReader->ever = ever;
// snapshot info
if (pParam->data) {
SSyncTLV *datHead = (void *)pParam->data;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
@ -72,37 +93,29 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader
}
int32_t offset = 0;
TSnapRangeArray **ppRanges = NULL;
while (offset + sizeof(SSyncTLV) < datHead->len) {
SSyncTLV *sectHead = (void *)(datHead->val + offset);
offset += sizeof(SSyncTLV) + sectHead->len;
void *buf = sectHead->val;
int32_t bufLen = sectHead->len;
ASSERT(sectHead->typ == SNAP_DATA_TSDB || sectHead->typ == SNAP_DATA_RSMA1);
STsdbSnapPartList *pList = tsdbSnapPartListCreate();
if (pList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
SSyncTLV *subField = (void *)(datHead->val + offset);
offset += sizeof(SSyncTLV) + subField->len;
void *buf = subField->val;
int32_t bufLen = subField->len;
switch (subField->typ) {
case SNAP_DATA_TSDB:
ppRanges = &pReader->pRanges;
break;
case SNAP_DATA_RSMA1:
ppRanges = &pReader->pRsmaRanges;
break;
default:
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode),
subField->typ);
goto _err;
}
if (tDeserializeTsdbSnapPartList(buf, bufLen, pList) < 0) {
terrno = TSDB_CODE_INVALID_DATA_FMT;
goto _err;
}
TSnapRangeArray **ppRanges = NULL;
if (sectHead->typ == SNAP_DATA_TSDB) {
ppRanges = &pReader->pRanges;
} else if (sectHead->typ == SNAP_DATA_RSMA1) {
ppRanges = &pReader->pRsmaRanges;
}
if (ppRanges == NULL) {
tsdbSnapPartListDestroy(&pList);
continue;
}
if (tsdbSnapPartListToRangeDiff(pList, ppRanges) < 0) {
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) {
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
tsdbSnapPartListDestroy(&pList);
goto _err;
}
tsdbSnapPartListDestroy(&pList);
}
}
@ -414,6 +427,7 @@ struct SVSnapWriter {
// meta
SMetaSnapWriter *pMetaSnapWriter;
// tsdb
TSnapRangeArray *pRanges;
STsdbSnapWriter *pTsdbSnapWriter;
// tq
STqSnapWriter *pTqSnapWriter;
@ -423,12 +437,15 @@ struct SVSnapWriter {
SStreamTaskWriter *pStreamTaskWriter;
SStreamStateWriter *pStreamStateWriter;
// rsma
TSnapRangeArray *pRsmaRanges;
SRSmaSnapWriter *pRsmaSnapWriter;
};
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) {
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
int32_t code = 0;
SVSnapWriter *pWriter = NULL;
int64_t sver = pParam->start;
int64_t ever = pParam->end;
// commit memory data
vnodeAsyncCommit(pVnode);
@ -447,6 +464,41 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
// inc commit ID
pWriter->commitID = ++pVnode->state.commitID;
// snapshot info
if (pParam->data) {
SSyncTLV *datHead = (void *)pParam->data;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
terrno = TSDB_CODE_INVALID_DATA_FMT;
goto _err;
}
int32_t offset = 0;
TSnapRangeArray **ppRanges = NULL;
while (offset + sizeof(SSyncTLV) < datHead->len) {
SSyncTLV *subField = (void *)(datHead->val + offset);
offset += sizeof(SSyncTLV) + subField->len;
void *buf = subField->val;
int32_t bufLen = subField->len;
switch (subField->typ) {
case SNAP_DATA_TSDB:
ppRanges = &pWriter->pRanges;
break;
case SNAP_DATA_RSMA1:
ppRanges = &pWriter->pRsmaRanges;
break;
default:
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode),
subField->typ);
goto _err;
}
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) {
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
goto _err;
}
}
}
vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
sver, ever, pWriter->commitID);
*ppWriter = pWriter;

View File

@ -491,8 +491,7 @@ static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **p
}
static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam;
SVnode *pVnode = pFsm->data;
do {
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
@ -505,7 +504,7 @@ static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void
}
} while (true);
int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter);
int32_t code = vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter);
return code;
}

View File

@ -368,6 +368,17 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
pReceiver->pWriter = NULL;
}
// free data of snapshot info
if (pReceiver->snapshotParam.data) {
taosMemoryFree(pReceiver->snapshotParam.data);
pReceiver->snapshotParam.data = NULL;
}
if (pReceiver->snapshot.data) {
taosMemoryFree(pReceiver->snapshot.data);
pReceiver->snapshot.data = NULL;
}
// free receiver
taosMemoryFree(pReceiver);
}
@ -652,6 +663,17 @@ _SEND_REPLY:
if (snapInfo.data) {
pRspMsg->payloadType = snapInfo.typ;
memcpy(pRspMsg->data, snapInfo.data, dataLen);
// save snapshot info
SSnapshotParam *pParam = &pReceiver->snapshotParam;
void *data = taosMemoryRealloc(pParam->data, dataLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _out;
}
pParam->data = data;
memcpy(pParam->data, snapInfo.data, dataLen);
}
// send msg
@ -941,7 +963,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
terrno = TSDB_CODE_INVALID_DATA_FMT;
return -1;
}
pSender->snapshotParam.data = pMsg->data;
pSender->snapshotParam.data = (void *)pMsg->data;
}
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);