diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index dc3aa418b4..95982abfbe 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -715,6 +715,20 @@ int32_t tSerializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFS int32_t tDeserializeTsdbFSetPartList(void *buf, int32_t bufLen, STsdbFSetPartList *pList); int32_t tsdbFSetPartListToRangeDiff(STsdbFSetPartList *pList, TFileSetRangeArray **ppRanges); +// snap rep format +typedef enum ETsdbRepFmt { + TSDB_SNAP_REP_FMT_DEFAULT = 0, + TSDB_SNAP_REP_FMT_RAW, + TSDB_SNAP_REP_FMT_HYBRID, +} ETsdbRepFmt; + +typedef struct STsdbRepOpts { + ETsdbRepFmt format; +} STsdbRepOpts; + +int32_t tSerializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); +int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); + // snap read struct STsdbReadSnap { SMemTable *pMem; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c index 65ee1a7db3..9dae9bdd36 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapInfo.c @@ -443,6 +443,126 @@ static int32_t tsdbPartitionInfoSerialize(STsdbPartitionInfo* pInfo, uint8_t* bu return offset; } +// tsdb replication opts +static int32_t tTsdbRepOptsDataLenCalc(STsdbRepOpts* pInfo) { + int32_t hdrLen = sizeof(int32_t); + int32_t datLen = 0; + + int8_t msgVer = 0; + int64_t reserved64 = 0; + int16_t format = 0; + hdrLen += sizeof(msgVer); + datLen += hdrLen; + datLen += sizeof(format); + datLen += sizeof(reserved64); + datLen += sizeof(*pInfo); + return datLen; +} + +int32_t tSerializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + int64_t reserved64 = 0; + int8_t msgVer = TSDB_SNAP_MSG_VER; + + if (tStartEncode(&encoder) < 0) goto _err; + if (tEncodeI8(&encoder, msgVer) < 0) goto _err; + int16_t format = pOpts->format; + if (tEncodeI16(&encoder, format) < 0) goto _err; + if (tEncodeI64(&encoder, reserved64) < 0) goto _err; + + tEndEncode(&encoder); + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; + +_err: + tEncoderClear(&encoder); + return -1; +} + +int32_t tDeserializeTsdbRepOpts(void* buf, int32_t bufLen, STsdbRepOpts* pOpts) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + int64_t reserved64 = 0; + int8_t msgVer = 0; + + if (tStartDecode(&decoder) < 0) goto _err; + if (tDecodeI8(&decoder, &msgVer) < 0) goto _err; + if (msgVer != TSDB_SNAP_MSG_VER) goto _err; + int16_t format = 0; + if (tDecodeI16(&decoder, &format) < 0) goto _err; + pOpts->format = format; + if (tDecodeI64(&decoder, &reserved64) < 0) goto _err; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; + +_err: + tDecoderClear(&decoder); + return -1; +} + +static int32_t tsdbRepOptsEstSize(STsdbRepOpts* pOpts) { + int32_t dataLen = 0; + dataLen += sizeof(SSyncTLV); + dataLen += tTsdbRepOptsDataLenCalc(pOpts); + return dataLen; +} + +static int32_t tsdbRepOptsSerialize(STsdbRepOpts* pOpts, void* buf, int32_t bufLen) { + SSyncTLV* pSubHead = buf; + int32_t offset = 0; + int32_t tlen = 0; + if ((tlen = tSerializeTsdbRepOpts(pSubHead->val, bufLen, pOpts)) < 0) { + return -1; + } + pSubHead->typ = SNAP_DATA_RAW; + pSubHead->len = tlen; + offset += sizeof(*pSubHead) + tlen; + return offset; +} + +// snap info +static int32_t tsdbSnapPrepDealWithSnapInfo(SVnode* pVnode, SSnapshot* pSnap, STsdbRepOpts* pInfo) { + if (!pSnap->data) return 0; + int32_t code = -1; + + SSyncTLV* pHead = (void*)pSnap->data; + int32_t offset = 0; + + while (offset + sizeof(*pHead) < pHead->len) { + SSyncTLV* pField = (void*)(pHead->val + offset); + offset += sizeof(*pField) + pField->len; + void* buf = pField->val; + int32_t bufLen = pField->len; + + switch (pField->typ) { + case SNAP_DATA_TSDB: + case SNAP_DATA_RSMA1: + case SNAP_DATA_RSMA2: { + } break; + case SNAP_DATA_RAW: { + if (tDeserializeTsdbRepOpts(buf, bufLen, pInfo) < 0) { + terrno = TSDB_CODE_INVALID_DATA_FMT; + tsdbError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + } break; + default: + tsdbError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), pField->typ); + goto _out; + } + } + + code = 0; +_out: + return code; +} + int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { ASSERT(pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY); STsdbPartitionInfo partitionInfo = {0}; @@ -453,10 +573,22 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { goto _out; } + // deal with snap info for reply + STsdbRepOpts opts = {.format = TSDB_SNAP_REP_FMT_RAW}; + if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { + STsdbRepOpts leaderOpts = {0}; + if (tsdbSnapPrepDealWithSnapInfo(pVnode, pSnap, &leaderOpts) < 0) { + tsdbError("vgId:%d, failed to deal with snap info for reply since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + opts.format = TMIN(opts.format, leaderOpts.format); + } + // info data realloc const int32_t headLen = sizeof(SSyncTLV); int32_t bufLen = headLen; bufLen += tsdbPartitionInfoEstSize(pInfo); + bufLen += tsdbRepOptsEstSize(&opts); if (syncSnapInfoDataRealloc(pSnap, bufLen) != 0) { tsdbError("vgId:%d, failed to realloc memory for data of snap info. bytes:%d", TD_VID(pVnode), bufLen); goto _out; @@ -474,6 +606,13 @@ int32_t tsdbSnapPrepDescription(SVnode* pVnode, SSnapshot* pSnap) { offset += tlen; ASSERT(offset <= bufLen); + if ((tlen = tsdbRepOptsSerialize(&opts, buf + offset, bufLen - offset)) < 0) { + tsdbError("vgId:%d, failed to serialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + offset += tlen; + ASSERT(offset <= bufLen); + // set header of info data SSyncTLV* pHead = pSnap->data; pHead->typ = pSnap->type; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 2a8484bcd2..438fa35713 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -92,14 +92,16 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP int32_t code = -1; if (pParam->data) { + // decode SSyncTLV *datHead = (void *)pParam->data; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { terrno = TSDB_CODE_INVALID_DATA_FMT; goto _out; } + STsdbRepOpts tsdbOpts = {0}; TFileSetRangeArray **ppRanges = NULL; - int32_t offset = 0; + int32_t offset = 0; while (offset + sizeof(SSyncTLV) < datHead->len) { SSyncTLV *subField = (void *)(datHead->val + offset); @@ -121,13 +123,30 @@ static int32_t vnodeSnapReaderDealWithSnapInfo(SVSnapReader *pReader, SSnapshotP goto _out; } } break; + case SNAP_DATA_RAW: { + if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) { + vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + } break; default: vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); goto _out; } } - } + // toggle snap replication mode + vInfo("vgId:%d, vnode snap reader supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format); + if (pReader->sver == 0 && tsdbOpts.format == TSDB_SNAP_REP_FMT_RAW) { + pReader->tsdbDone = true; + } else { + pReader->tsdbRAWDone = true; + } + + ASSERT(pReader->tsdbDone != pReader->tsdbRAWDone); + vInfo("vgId:%d, vnode snap writer enabled replication mode: %s", TD_VID(pVnode), + (pReader->tsdbDone ? "raw" : "normal")); + } code = 0; _out: return code; @@ -277,8 +296,6 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } // TSDB ============== - pReader->tsdbDone = true; - if (!pReader->tsdbDone) { // open if not if (pReader->pTsdbReader == NULL) { @@ -534,6 +551,7 @@ static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotP goto _out; } + STsdbRepOpts tsdbOpts = {0}; TFileSetRangeArray **ppRanges = NULL; int32_t offset = 0; @@ -557,11 +575,19 @@ static int32_t vnodeSnapWriterDealWithSnapInfo(SVSnapWriter *pWriter, SSnapshotP goto _out; } } break; + case SNAP_DATA_RAW: { + if (tDeserializeTsdbRepOpts(buf, bufLen, &tsdbOpts) < 0) { + vError("vgId:%d, failed to deserialize tsdb rep opts since %s", TD_VID(pVnode), terrstr()); + goto _out; + } + } break; default: vError("vgId:%d, unexpected subfield type of snap info. typ:%d", TD_VID(pVnode), subField->typ); goto _out; } } + + vInfo("vgId:%d, vnode snap writer supported tsdb rep of format:%d", TD_VID(pVnode), tsdbOpts.format); } code = 0;