feat: toggle tsdb snap replication mode by snap info handshake ahead of time

This commit is contained in:
Benguang Zhao 2023-12-05 11:36:58 +08:00
parent ef34176e37
commit 52672657c1
3 changed files with 183 additions and 4 deletions

View File

@ -715,6 +715,20 @@ int32_t tSerializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFS
int32_t tDeserializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFSetPartList *pList);
int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList *pList, TFileSetRangeArray **ppRanges);
// snap rep format
typedef enum ETsdbRepFmt {
TSDB_SNAP_REP_FMT_DEFAULT = 0,
TSDB_SNAP_REP_FMT_RAW,
TSDB_SNAP_REP_FMT_HYBRID,
} ETsdbRepFmt;
typedef struct STsdbRepOpts {
ETsdbRepFmt format;
} STsdbRepOpts;
int32_t tSerializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);
int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);
// snap read
struct STsdbReadSnap {
SMemTable *pMem;

View File

@ -443,6 +443,126 @@ static int32_t tsdbPartitionInfoSerialize(STsdbPartitionInfo* pInfo, uint8_t* bu
return offset;
}
// tsdb replication opts
static int32_t tTsdbRepOptsDataLenCalc(STsdbRepOpts* pInfo) {
int32_t hdrLen = sizeof(int32_t);
int32_t datLen = 0;
int8_t msgVer = 0;
int64_t reserved64 = 0;
int16_t format = 0;
hdrLen += sizeof(msgVer);
datLen += hdrLen;
datLen += sizeof(format);
datLen += sizeof(reserved64);
datLen += sizeof(*pInfo);
return datLen;
}
int32_t tSerializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
int64_t reserved64 = 0;
int8_t msgVer = TSDB_SNAP_MSG_VER;
if (tStartEncode(&encoder) < 0) goto _err;
if (tEncodeI8(&encoder, msgVer) < 0) goto _err;
int16_t format = pOpts->format;
if (tEncodeI16(&encoder, format) < 0) goto _err;
if (tEncodeI64(&encoder, reserved64) < 0) goto _err;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
_err:
tEncoderClear(&encoder);
return -1;
}
int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
int64_t reserved64 = 0;
int8_t msgVer = 0;
if (tStartDecode(&decoder) < 0) goto _err;
if (tDecodeI8(&decoder, &msgVer) < 0) goto _err;
if (msgVer != TSDB_SNAP_MSG_VER) goto _err;
int16_t format = 0;
if (tDecodeI16(&decoder, &format) < 0) goto _err;
pOpts->format = format;
if (tDecodeI64(&decoder, &reserved64) < 0) goto _err;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
_err:
tDecoderClear(&decoder);
return -1;
}
static int32_t tsdbRepOptsEstSize(STsdbRepOpts* pOpts) {
int32_t dataLen = 0;
dataLen += sizeof(SSyncTLV);
dataLen += tTsdbRepOptsDataLenCalc(pOpts);
return dataLen;
}
static int32_t tsdbRepOptsSerialize(STsdbRepOpts* pOpts, void* buf, int32_t bufLen) {
SSyncTLV* pSubHead = buf;
int32_t offset = 0;
int32_t tlen = 0;
if ((tlen = tSerializeTsdbRepOpts(pSubHead->val, bufLen, pOpts)) < 0) {
return -1;
}
pSubHead->typ = SNAP_DATA_RAW;
pSubHead->len = tlen;
offset += sizeof(*pSubHead) + tlen;
return offset;
}
// snap info
static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, STsdbRepOpts* pInfo) {
if (!pSnap->data) return 0;
int32_t code = -1;
SSyncTLV* pHead = (void*)pSnap->data;
int32_t offset = 0;
while (offset + sizeof(*pHead) < pHead->len) {
SSyncTLV* pField = (void*)(pHead->val + offset);
offset += sizeof(*pField) + pField->len;
void* buf = pField->val;
int32_t bufLen = pField->len;
switch (pField->typ) {
case SNAP_DATA_TSDB:
case SNAP_DATA_RSMA1:
case SNAP_DATA_RSMA2: {
} break;
case SNAP_DATA_RAW: {
if (tDeserializeTsdbRepOpts(buf, bufLen, pInfo) < 0) {
terrno = TSDB_CODE_INVALID_DATA_FMT;
tsdbError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
goto _out;
}
} break;
default:
tsdbError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), pField->typ);
goto _out;
}
}
code = 0;
_out:
return code;
}
int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
ASSERT(pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY);
STsdbPartitionInfo partitionInfo = {0};
@ -453,10 +573,22 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
goto _out;
}
// deal with snap info for reply
STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW};
if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
STsdbRepOpts leaderOpts = {0};
if (tsdbSnapPrepDealWithSnapInfo(pVnode, pSnap, &leaderOpts) < 0) {
tsdbError("vgId:%d, failed to deal with snap info for reply since %s", TD_VID(pVnode), terrstr());
goto _out;
}
opts.format = TMIN(opts.format, leaderOpts.format);
}
// info data realloc
const int32_t headLen = sizeof(SSyncTLV);
int32_t bufLen = headLen;
bufLen += tsdbPartitionInfoEstSize(pInfo);
bufLen += tsdbRepOptsEstSize(&opts);
if (syncSnapInfoDataRealloc(pSnap, bufLen) != 0) {
tsdbError("vgId:%d, failed to realloc memory for data of snap info. bytes:%d", TD_VID(pVnode), bufLen);
goto _out;
@ -474,6 +606,13 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) {
offset += tlen;
ASSERT(offset <= bufLen);
if ((tlen = tsdbRepOptsSerialize(&opts, buf + offset, bufLen - offset)) < 0) {
tsdbError("vgId:%d, failed to serialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
goto _out;
}
offset += tlen;
ASSERT(offset <= bufLen);
// set header of info data
SSyncTLV* pHead = pSnap->data;
pHead->typ = pSnap->type;

View File

@ -92,14 +92,16 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP
int32_t code = -1;
if (pParam->data) {
// decode
SSyncTLV *datHead = (void *)pParam->data;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
terrno = TSDB_CODE_INVALID_DATA_FMT;
goto _out;
}
STsdbRepOpts tsdbOpts = {0};
TFileSetRangeArray **ppRanges = NULL;
int32_t offset = 0;
int32_t offset = 0;
while (offset + sizeof(SSyncTLV) < datHead->len) {
SSyncTLV *subField = (void *)(datHead->val + offset);
@ -121,13 +123,30 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP
goto _out;
}
} break;
case SNAP_DATA_RAW: {
if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) {
vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
goto _out;
}
} break;
default:
vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
goto _out;
}
}
}
// toggle snap replication mode
vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) {
pReader->tsdbDone = true;
} else {
pReader->tsdbRAWDone = true;
}
ASSERT(pReader->tsdbDone != pReader->tsdbRAWDone);
vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode),
(pReader->tsdbDone ? "raw" : "normal"));
}
code = 0;
_out:
return code;
@ -277,8 +296,6 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
}
// TSDB ==============
pReader->tsdbDone = true;
if (!pReader->tsdbDone) {
// open if not
if (pReader->pTsdbReader == NULL) {
@ -534,6 +551,7 @@ static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotP
goto _out;
}
STsdbRepOpts tsdbOpts = {0};
TFileSetRangeArray **ppRanges = NULL;
int32_t offset = 0;
@ -557,11 +575,19 @@ static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotP
goto _out;
}
} break;
case SNAP_DATA_RAW: {
if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) {
vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr());
goto _out;
}
} break;
default:
vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ);
goto _out;
}
}
vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format);
}
code = 0;