refact: wrap funcs to handle snapshot info for vnodeSnapReaderOpen and vnodeSnapWriterOpen
This commit is contained in:
parent
eb8c4d3e8c
commit
59e8a2104c
|
@ -83,6 +83,42 @@ static TSnapRangeArray **vnodeSnapReaderGetTsdbRanges(SVSnapReader *pReader, int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeSnapReaderDoSnapInfo(SVSnapReader *pReader, SSnapshotParam *pParam) {
|
||||||
|
SVnode *pVnode = pReader->pVnode;
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
|
if (pParam->data) {
|
||||||
|
SSyncTLV *datHead = (void *)pParam->data;
|
||||||
|
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||||
|
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
TSnapRangeArray **ppRanges = NULL;
|
||||||
|
int32_t offset = 0;
|
||||||
|
|
||||||
|
while (offset + sizeof(SSyncTLV) < datHead->len) {
|
||||||
|
SSyncTLV *subField = (void *)(datHead->val + offset);
|
||||||
|
offset += sizeof(SSyncTLV) + subField->len;
|
||||||
|
void *buf = subField->val;
|
||||||
|
int32_t bufLen = subField->len;
|
||||||
|
ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ);
|
||||||
|
if (ppRanges == NULL) {
|
||||||
|
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) {
|
||||||
|
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
_out:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
|
int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t sver = pParam->start;
|
int64_t sver = pParam->start;
|
||||||
|
@ -99,31 +135,8 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader
|
||||||
pReader->ever = ever;
|
pReader->ever = ever;
|
||||||
|
|
||||||
// snapshot info
|
// snapshot info
|
||||||
if (pParam->data) {
|
if (vnodeSnapReaderDoSnapInfo(pReader, pParam) < 0) {
|
||||||
SSyncTLV *datHead = (void *)pParam->data;
|
goto _err;
|
||||||
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
|
||||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
TSnapRangeArray **ppRanges = NULL;
|
|
||||||
int32_t offset = 0;
|
|
||||||
|
|
||||||
while (offset + sizeof(SSyncTLV) < datHead->len) {
|
|
||||||
SSyncTLV *subField = (void *)(datHead->val + offset);
|
|
||||||
offset += sizeof(SSyncTLV) + subField->len;
|
|
||||||
void *buf = subField->val;
|
|
||||||
int32_t bufLen = subField->len;
|
|
||||||
ppRanges = vnodeSnapReaderGetTsdbRanges(pReader, subField->typ);
|
|
||||||
if (ppRanges == NULL) {
|
|
||||||
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) {
|
|
||||||
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
|
vInfo("vgId:%d, vnode snapshot reader opened, sver:%" PRId64 " ever:%" PRId64, TD_VID(pVnode), sver, ever);
|
||||||
|
@ -470,6 +483,42 @@ TSnapRangeArray **vnodeSnapWriterGetTsdbRanges(SVSnapWriter *pWriter, int32_t ts
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeSnapWriterDoSnapInfo(SVSnapWriter *pWriter, SSnapshotParam *pParam) {
|
||||||
|
SVnode *pVnode = pWriter->pVnode;
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
|
if (pParam->data) {
|
||||||
|
SSyncTLV *datHead = (void *)pParam->data;
|
||||||
|
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||||
|
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
TSnapRangeArray **ppRanges = NULL;
|
||||||
|
int32_t offset = 0;
|
||||||
|
|
||||||
|
while (offset + sizeof(SSyncTLV) < datHead->len) {
|
||||||
|
SSyncTLV *subField = (void *)(datHead->val + offset);
|
||||||
|
offset += sizeof(SSyncTLV) + subField->len;
|
||||||
|
void *buf = subField->val;
|
||||||
|
int32_t bufLen = subField->len;
|
||||||
|
ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ);
|
||||||
|
if (ppRanges == NULL) {
|
||||||
|
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) {
|
||||||
|
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
_out:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
|
int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SVSnapWriter *pWriter = NULL;
|
SVSnapWriter *pWriter = NULL;
|
||||||
|
@ -494,31 +543,8 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter
|
||||||
pWriter->commitID = ++pVnode->state.commitID;
|
pWriter->commitID = ++pVnode->state.commitID;
|
||||||
|
|
||||||
// snapshot info
|
// snapshot info
|
||||||
if (pParam->data) {
|
if (vnodeSnapWriterDoSnapInfo(pWriter, pParam) < 0) {
|
||||||
SSyncTLV *datHead = (void *)pParam->data;
|
goto _err;
|
||||||
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
|
||||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
TSnapRangeArray **ppRanges = NULL;
|
|
||||||
int32_t offset = 0;
|
|
||||||
|
|
||||||
while (offset + sizeof(SSyncTLV) < datHead->len) {
|
|
||||||
SSyncTLV *subField = (void *)(datHead->val + offset);
|
|
||||||
offset += sizeof(SSyncTLV) + subField->len;
|
|
||||||
void *buf = subField->val;
|
|
||||||
int32_t bufLen = subField->len;
|
|
||||||
ppRanges = vnodeSnapWriterGetTsdbRanges(pWriter, subField->typ);
|
|
||||||
if (ppRanges == NULL) {
|
|
||||||
vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), subField->typ);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) {
|
|
||||||
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
|
vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode),
|
||||||
|
|
Loading…
Reference in New Issue