diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 4a2ae18765..b92bba831c 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -299,8 +299,8 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) // no longer used - TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used + TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT, "sync-prep-snapshot", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT_REPLY, "sync-prep-snapshot-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 53e6ec0d71..cc381bb54e 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -101,6 +101,12 @@ typedef struct SNodeInfo { ESyncRole nodeRole; } SNodeInfo; +typedef struct SSyncTLV { + int32_t typ; + int32_t len; + char val[]; +} SSyncTLV; + typedef struct SSyncCfg { int32_t totalReplicaNum; int32_t replicaNum; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 24b5b2566c..d5488da770 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -238,7 +238,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -246,7 +246,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0e17d2b75f..b4fe824466 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -848,14 +848,14 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ffd74dc3d1..4cea7c5e85 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -681,6 +681,33 @@ int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSn int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR); void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap); +// snap partition list +typedef TARRAY2(SVersionRange) SVerRangeList; +typedef struct STsdbSnapPartition STsdbSnapPartition; +typedef TARRAY2(STsdbSnapPartition *) STsdbSnapPartList; +// util +STsdbSnapPartList *tsdbSnapPartListCreate(); +void tsdbSnapPartListDestroy(STsdbSnapPartList **ppList); +int32_t tSerializeTsdbSnapPartList(void *buf, int32_t bufLen, STsdbSnapPartList *pList); +int32_t tDeserializeTsdbSnapPartList(void *buf, int32_t bufLen, STsdbSnapPartList *pList); +int32_t tsdbSnapPartListToRangeDiff(STsdbSnapPartList *pList, TSnapRangeArray **ppRanges); + +enum { + TSDB_SNAP_RANGE_TYP_HEAD = 0, + TSDB_SNAP_RANGE_TYP_DATA, + TSDB_SNAP_RANGE_TYP_SMA, + TSDB_SNAP_RANGE_TYP_TOMB, + TSDB_SNAP_RANGE_TYP_STT, + TSDB_SNAP_RANGE_TYP_MAX, +}; + +struct STsdbSnapPartition { + int64_t fid; + int8_t stat; + SVerRangeList verRanges[TSDB_SNAP_RANGE_TYP_MAX]; +}; + +// snap read struct STsdbReadSnap { SMemTable *pMem; SQueryNode *pNode; diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index 756250157b..e6d78a8cfe 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -81,6 +81,7 @@ struct SSttLvl { struct STFileSet { int32_t fid; + int8_t stat; STFileObj *farr[TSDB_FTYPE_MAX]; // file array TSttLvlArray lvlArr[1]; // level array }; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 3e18f01f04..9710eee6c4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1158,96 +1158,198 @@ _exit: return code; } -static int32_t tsdbTSnapRangeCmprFn(STSnapRange* fsr1, STSnapRange* fsr2) { - if (fsr1->fid < fsr2->fid) return -1; - if (fsr1->fid > fsr2->fid) return 1; +// snap part +static int32_t tsdbSnapPartCmprFn(STsdbSnapPartition* x, STsdbSnapPartition* y) { + if (x->fid < y->fid) return -1; + if (x->fid > y->fid) return 1; return 0; } -static int32_t tsdbTFileInsertSnapRange(STFile* f, TSnapRangeArray* snapR) { - STSnapRange* fsr = taosMemoryCalloc(1, sizeof(*fsr)); - if (fsr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - fsr->fid = f->fid; - fsr->sver = f->minVer; - fsr->ever = f->maxVer; - - int32_t code = TARRAY2_SORT_INSERT(snapR, fsr, tsdbTSnapRangeCmprFn); - if (code) { - taosMemoryFree(fsr); - fsr = NULL; - } - return code; +static int32_t tVersionRangeCmprFn(SVersionRange* x, SVersionRange* y) { + if (x->minVer < y->minVer) return -1; + if (x->minVer > y->minVer) return 1; + if (x->maxVer < y->maxVer) return -1; + if (x->maxVer > y->maxVer) return 1; + return 0; } -static int32_t tsdbTFSetInsertSnapRange(STFileSet* fset, TSnapRangeArray* snapR) { - STFile tf = {.fid = fset->fid, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; - for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { - if (fset->farr[ftype] == NULL) continue; - STFile* f = fset->farr[ftype]->f; - tsdbTFileUpdVerRange(&tf, (SVersionRange){.minVer = f->minVer, .maxVer = f->maxVer}); +STsdbSnapPartition* tsdbSnapPartitionCreate() { + STsdbSnapPartition* pSP = taosMemoryCalloc(1, sizeof(STsdbSnapPartition)); + if (pSP == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + for (int32_t i = 0; i < TSDB_SNAP_RANGE_TYP_MAX; i++) { + TARRAY2_INIT(&pSP->verRanges[i]); + } + return pSP; +} + +void tsdbSnapPartitionClear(STsdbSnapPartition** ppSP) { + if (ppSP == NULL || ppSP[0] == NULL) { + return; + } + for (int32_t i = 0; i < TSDB_SNAP_RANGE_TYP_MAX; i++) { + TARRAY2_DESTROY(&ppSP[0]->verRanges[i], NULL); + } + taosMemoryFree(ppSP[0]); + ppSP[0] = NULL; +} + +static int32_t tsdbFTypeToSRangeTyp(tsdb_ftype_t ftype) { + switch (ftype) { + case TSDB_FTYPE_HEAD: + return TSDB_SNAP_RANGE_TYP_HEAD; + case TSDB_FTYPE_DATA: + return TSDB_SNAP_RANGE_TYP_DATA; + case TSDB_FTYPE_SMA: + return TSDB_SNAP_RANGE_TYP_SMA; + case TSDB_FTYPE_TOMB: + return TSDB_SNAP_RANGE_TYP_TOMB; + case TSDB_FTYPE_STT: + return TSDB_SNAP_RANGE_TYP_STT; + } + return TSDB_SNAP_RANGE_TYP_MAX; +} + +static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP) { + STsdbSnapPartition* p = tsdbSnapPartitionCreate(); + if (p == NULL) { + goto _err; } + int32_t typ = 0; + for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { + if (fset->farr[ftype] == NULL) continue; + typ = tsdbFTypeToSRangeTyp(ftype); + ASSERT(typ < TSDB_SNAP_RANGE_TYP_MAX); + STFile* f = fset->farr[ftype]->f; + SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; + TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); + } + + typ = TSDB_SNAP_RANGE_TYP_STT; const SSttLvl* lvl; TARRAY2_FOREACH(fset->lvlArr, lvl) { STFileObj* fobj; TARRAY2_FOREACH(lvl->fobjArr, fobj) { - tsdbTFileUpdVerRange(&tf, (SVersionRange){.minVer = fobj->f->minVer, .maxVer = fobj->f->maxVer}); + STFile* f = fobj->f; + SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; + TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); } } + ppSP[0] = p; + return 0; - int32_t code = tsdbTFileInsertSnapRange(&tf, snapR); - if (code) return code; - return code; +_err: + tsdbSnapPartitionClear(&p); + return -1; } -static TSnapRangeArray* tsdbGetSnapRangeArray(STFileSystem* fs) { - int32_t code = 0; - TSnapRangeArray* snapR = taosMemoryCalloc(1, sizeof(*snapR)); - if (snapR == NULL) { +STsdbSnapPartList* tsdbSnapPartListCreate() { + STsdbSnapPartList* pList = taosMemoryCalloc(1, sizeof(STsdbSnapPartList)); + if (pList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - TARRAY2_INIT(snapR); + TARRAY2_INIT(pList); + return pList; +} +static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) { + STsdbSnapPartList* pList = tsdbSnapPartListCreate(); + if (pList == NULL) { + return NULL; + } + + int32_t code = 0; taosThreadRwlockRdlock(&fs->tsdb->rwLock); STFileSet* fset; TARRAY2_FOREACH(fs->fSetArr, fset) { - code = tsdbTFSetInsertSnapRange(fset, snapR); - if (code) break; + STsdbSnapPartition* pItem = NULL; + if (tsdbTFileSetToSnapPart(fset, &pItem) < 0) { + code = -1; + break; + } + ASSERT(pItem != NULL); + TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn); } taosThreadRwlockUnlock(&fs->tsdb->rwLock); if (code) { - TARRAY2_DESTROY(snapR, tsdbTSnapRangeClear); - taosMemoryFree(snapR); - snapR = NULL; + TARRAY2_DESTROY(pList, tsdbSnapPartitionClear); + taosMemoryFree(pList); + pList = NULL; } - return snapR; + return pList; } -int32_t tSerializeSnapRangeArray(void* buf, int32_t bufLen, TSnapRangeArray* pSnapR) { +int32_t tTsdbSnapPartListDataLenCalc(STsdbSnapPartList* pList) { + int32_t hdrLen = sizeof(int32_t); + int32_t datLen = 0; + + int8_t msgVer = 1; + int32_t len = TARRAY2_SIZE(pList); + hdrLen += sizeof(msgVer); + hdrLen += sizeof(len); + datLen += hdrLen; + + for (int32_t u = 0; u < len; u++) { + STsdbSnapPartition* p = TARRAY2_GET(pList, u); + int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX; + int32_t uItem = 0; + uItem += sizeof(STsdbSnapPartition); + uItem += sizeof(typMax); + + for (int32_t i = 0; i < typMax; i++) { + int32_t iLen = TARRAY2_SIZE(&p->verRanges[i]); + int32_t jItem = 0; + jItem += sizeof(SVersionRange); + jItem += sizeof(int64_t); + uItem += sizeof(iLen) + jItem * iLen; + } + datLen += uItem; + } + return datLen; +} + +int32_t tSerializeTsdbSnapPartList(void* buf, int32_t bufLen, STsdbSnapPartList* pList) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); - int8_t msgVer = 1; - int32_t arrLen = TARRAY2_SIZE(pSnapR); int8_t reserved8 = 0; + int16_t reserved16 = 0; + int64_t reserved64 = 0; + + int8_t msgVer = 1; + int32_t len = TARRAY2_SIZE(pList); + if (tStartEncode(&encoder) < 0) goto _err; if (tEncodeI8(&encoder, msgVer) < 0) goto _err; - if (tEncodeI8(&encoder, reserved8) < 0) goto _err; - if (tEncodeI32(&encoder, arrLen) < 0) goto _err; + if (tEncodeI32(&encoder, len) < 0) goto _err; - int64_t reserved64 = 0; - for (int32_t i = 0; i < arrLen; i++) { - STSnapRange* u = TARRAY2_GET(pSnapR, i); - int64_t fid = u->fid; - if (tEncodeI64(&encoder, fid) < 0) goto _err; - if (tEncodeI64(&encoder, u->sver) < 0) goto _err; - if (tEncodeI64(&encoder, u->ever) < 0) goto _err; - if (tEncodeI64(&encoder, reserved64) < 0) goto _err; + for (int32_t u = 0; u < len; u++) { + STsdbSnapPartition* p = TARRAY2_GET(pList, u); + if (tEncodeI64(&encoder, p->fid) < 0) goto _err; + if (tEncodeI8(&encoder, p->stat) < 0) goto _err; + if (tEncodeI8(&encoder, reserved8) < 0) goto _err; + if (tEncodeI16(&encoder, reserved16) < 0) goto _err; + + int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX; + if (tEncodeI32(&encoder, typMax) < 0) goto _err; + + for (int32_t i = 0; i < typMax; i++) { + SVerRangeList* iList = &p->verRanges[i]; + int32_t iLen = TARRAY2_SIZE(iList); + + if (tEncodeI32(&encoder, iLen) < 0) goto _err; + for (int32_t j = 0; j < iLen; j++) { + SVersionRange r = TARRAY2_GET(iList, j); + if (tEncodeI64(&encoder, r.minVer) < 0) goto _err; + if (tEncodeI64(&encoder, r.maxVer) < 0) goto _err; + if (tEncodeI64(&encoder, reserved64) < 0) goto _err; + } + } } tEndEncode(&encoder); @@ -1260,30 +1362,47 @@ _err: return -1; } -int32_t tDeserializeSnapRangeArray(void* buf, int32_t bufLen, TSnapRangeArray* pSnapR) { +int32_t tDeserializeTsdbSnapPartList(void* buf, int32_t bufLen, STsdbSnapPartList* pList) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); - int8_t msgVer = 0; - int32_t arrLen = 0; int8_t reserved8 = 0; + int16_t reserved16 = 0; + int64_t reserved64 = 0; + + STsdbSnapPartition* p = NULL; + + int8_t msgVer = 0; + int32_t len = 0; if (tStartDecode(&decoder) < 0) goto _err; if (tDecodeI8(&decoder, &msgVer) < 0) goto _err; - if (tDecodeI8(&decoder, &reserved8) < 0) goto _err; - if (tDecodeI32(&decoder, &arrLen) < 0) goto _err; + if (tDecodeI32(&decoder, &len) < 0) goto _err; - int64_t fid = 0; - int64_t reserved64 = 0; - STSnapRange* pRange = NULL; - for (int32_t i = 0; i < arrLen; i++) { - pRange = taosMemoryCalloc(1, sizeof(STSnapRange)); - if (tDecodeI64(&decoder, &fid) < 0) goto _err; - pRange->fid = fid; - if (tDecodeI64(&decoder, &pRange->sver) < 0) goto _err; - if (tDecodeI64(&decoder, &pRange->ever) < 0) goto _err; - if (tDecodeI64(&decoder, &reserved64) < 0) goto _err; - TARRAY2_APPEND(pSnapR, pRange); - pRange = NULL; + for (int32_t u = 0; u < len; u++) { + p = tsdbSnapPartitionCreate(); + if (p == NULL) goto _err; + if (tDecodeI64(&decoder, &p->fid) < 0) goto _err; + if (tDecodeI8(&decoder, &p->stat) < 0) goto _err; + if (tDecodeI8(&decoder, &reserved8) < 0) goto _err; + if (tDecodeI16(&decoder, &reserved16) < 0) goto _err; + + int32_t typMax = 0; + if (tDecodeI32(&decoder, &typMax) < 0) goto _err; + + for (int32_t i = 0; i < typMax; i++) { + SVerRangeList* iList = &p->verRanges[i]; + int32_t iLen = 0; + if (tDecodeI32(&decoder, &iLen) < 0) goto _err; + for (int32_t j = 0; j < iLen; j++) { + SVersionRange r = {0}; + if (tDecodeI64(&decoder, &r.minVer) < 0) goto _err; + if (tDecodeI64(&decoder, &r.maxVer) < 0) goto _err; + if (tDecodeI64(&decoder, &reserved64) < 0) goto _err; + TARRAY2_APPEND(iList, r); + } + } + TARRAY2_APPEND(pList, p); + p = NULL; } tEndDecode(&decoder); @@ -1291,14 +1410,52 @@ int32_t tDeserializeSnapRangeArray(void* buf, int32_t bufLen, TSnapRangeArray* p return 0; _err: - if (pRange) { - taosMemoryFree(pRange); - pRange = NULL; + if (p) { + tsdbSnapPartitionClear(&p); } tDecoderClear(&decoder); return -1; } +int32_t tsdbSnapPartListToRangeDiff(STsdbSnapPartList* pList, TSnapRangeArray** ppRanges) { + TSnapRangeArray* pDiff = taosMemoryCalloc(1, sizeof(TSnapRangeArray)); + if (pDiff == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + TARRAY2_INIT(pDiff); + + STsdbSnapPartition* part; + TARRAY2_FOREACH(pList, part) { + STSnapRange* r = taosMemoryCalloc(1, sizeof(STSnapRange)); + if (r == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + int64_t ever = -1; + int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX; + for (int32_t i = 0; i < typMax; i++) { + SVerRangeList* iList = &part->verRanges[i]; + SVersionRange r = {0}; + TARRAY2_FOREACH(iList, r) { + if (r.maxVer < r.minVer) continue; + ever = TMAX(ever, r.maxVer); + } + } + r->sver = ever + 1; + r->ever = VERSION_MAX; + TARRAY2_APPEND(pDiff, r); + } + ppRanges[0] = pDiff; + return 0; + +_err: + if (pDiff) { + tsdbSnapRangeArrayDestroy(&pDiff); + } + return -1; +} + void tsdbSnapRangeArrayDestroy(TSnapRangeArray** ppSnap) { if (ppSnap && ppSnap[0]) { TARRAY2_DESTROY(ppSnap[0], tsdbTSnapRangeClear); @@ -1307,53 +1464,64 @@ void tsdbSnapRangeArrayDestroy(TSnapRangeArray** ppSnap) { } } -static int32_t tsdbSnapInfoDataLenCalc(TSnapRangeArray* pSnap) { - int32_t headerLen = 8; - int32_t itemLen = sizeof(STSnapRange) + 8; - int32_t size = TARRAY2_SIZE(pSnap); - return headerLen + itemLen * size; +void tsdbSnapPartListDestroy(STsdbSnapPartList** ppList) { + if (ppList == NULL || ppList[0] == NULL) return; + + TARRAY2_DESTROY(ppList[0], tsdbSnapPartitionClear); + taosMemoryFree(ppList[0]); + ppList[0] = NULL; } int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap) { - int32_t code = 0; - if (pSnap->typ == TAOS_SYNC_SNAP_INFO_BRIEF) { + if (pSnap->typ != TDMT_SYNC_PREP_SNAPSHOT && pSnap->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { return 0; } - code = -1; - TSnapRangeArray* snapR = tsdbGetSnapRangeArray(pTsdb->pFS); - if (snapR == NULL) { - goto _out; - } - if (pSnap->typ == TAOS_SYNC_SNAP_INFO_DIFF) { - for (int32_t i = 0; i < TARRAY2_SIZE(snapR); i++) { - STSnapRange* u = TARRAY2_GET(snapR, i); - u->sver = u->ever + 1; - u->ever = VERSION_MAX; - } + int code = -1; + STsdbSnapPartList* pList = tsdbGetSnapPartList(pTsdb->pFS); + if (pList == NULL) goto _out; + + if (pSnap->typ == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { } - int32_t bufLen = sizeof(SMsgHead) + tsdbSnapInfoDataLenCalc(snapR); - void* data = taosMemoryRealloc(pSnap->data, bufLen); + void* buf = NULL; + int32_t tlen = 0; + // estimate data length encode + int32_t bufLen = sizeof(SSyncTLV); // typ: TDMT_SYNC_PREP_SNAPSHOT or TDMT_SYNC_PREP_SNAPSOT_REPLY + bufLen += sizeof(SSyncTLV); // subtyp: SNAP_DATA_TSDB + bufLen += tTsdbSnapPartListDataLenCalc(pList); + + void* data = taosMemoryRealloc(pSnap->data, bufLen); if (data == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _out; } pSnap->data = data; - void* buf = ((char*)data) + sizeof(SMsgHead); - int32_t tlen = 0; - if ((tlen = tSerializeSnapRangeArray(buf, bufLen, snapR)) < 0) { + // header + SSyncTLV* datHead = (void*)pSnap->data; + datHead->typ = pSnap->typ; + datHead->len = 0; + + // tsdb + SSyncTLV* tsdbHead = (void*)datHead->val; + tsdbHead->typ = SNAP_DATA_TSDB; + + buf = tsdbHead->val; + tlen = 0; + if ((tlen = tSerializeTsdbSnapPartList(buf, bufLen, pList)) < 0) { tsdbError("vgId:%d, failed to serialize snap range since %s", TD_VID(pTsdb->pVnode), terrstr()); goto _out; } - SMsgHead* msgHead = pSnap->data; - msgHead->contLen = tlen; - msgHead->vgId = TD_VID(pTsdb->pVnode); + tsdbHead->len = tlen; + datHead->len += sizeof(SSyncTLV) + tsdbHead->len; + + // rsma code = 0; + _out: - if (snapR) { - tsdbSnapRangeArrayDestroy(&snapR); + if (pList) { + tsdbSnapPartListDestroy(&pList); } return code; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 418d8632c9..c56644ca2c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -45,6 +45,7 @@ struct SVSnapReader { SStreamStateReader *pStreamStateReader; // rsma int8_t rsmaDone; + TSnapRangeArray *pRsmaRanges; SRSmaSnapReader *pRsmaReader; }; @@ -64,19 +65,44 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader pReader->ever = ever; if (pParam->data) { - pReader->pRanges = taosMemoryCalloc(1, sizeof(*pReader->pRanges)); - if (pReader->pRanges == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + SSyncTLV *datHead = (void *)pParam->data; + if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { + terrno = TSDB_CODE_INVALID_DATA_FMT; goto _err; } - TARRAY2_INIT(pReader->pRanges); - SMsgHead *msgHead = pParam->data; - ASSERT(msgHead->vgId == TD_VID(pVnode)); - void *buf = (char *)pParam->data + sizeof(SMsgHead); - if (tDeserializeSnapRangeArray(buf, msgHead->contLen, pReader->pRanges) < 0) { - vError("vgId:%d, failed to deserialize snap range.", TD_VID(pVnode)); - goto _err; + int32_t offset = 0; + while (offset + sizeof(SSyncTLV) < datHead->len) { + SSyncTLV *sectHead = (void *)(datHead->val + offset); + offset += sizeof(SSyncTLV) + sectHead->len; + void *buf = sectHead->val; + int32_t bufLen = sectHead->len; + ASSERT(sectHead->typ == SNAP_DATA_TSDB || sectHead->typ == SNAP_DATA_RSMA1); + STsdbSnapPartList *pList = tsdbSnapPartListCreate(); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + if (tDeserializeTsdbSnapPartList(buf, bufLen, pList) < 0) { + terrno = TSDB_CODE_INVALID_DATA_FMT; + goto _err; + } + TSnapRangeArray **ppRanges = NULL; + if (sectHead->typ == SNAP_DATA_TSDB) { + ppRanges = &pReader->pRanges; + } else if (sectHead->typ == SNAP_DATA_RSMA1) { + ppRanges = &pReader->pRsmaRanges; + } + if (ppRanges == NULL) { + tsdbSnapPartListDestroy(&pList); + continue; + } + if (tsdbSnapPartListToRangeDiff(pList, ppRanges) < 0) { + vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); + tsdbSnapPartListDestroy(&pList); + goto _err; + } + tsdbSnapPartListDestroy(&pList); } } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 00ca6d8f90..9e035f60c2 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -216,42 +216,6 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) { return 0; } -#if 0 -int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) { - int32_t bytes = sizeof(SyncPreSnapshot); - pMsg->pCont = rpcMallocCont(bytes); - pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT; - pMsg->contLen = bytes; - if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - SyncPreSnapshot* pPreSnapshot = pMsg->pCont; - pPreSnapshot->bytes = bytes; - pPreSnapshot->msgType = TDMT_SYNC_PRE_SNAPSHOT; - pPreSnapshot->vgId = vgId; - return 0; -} - -int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) { - int32_t bytes = sizeof(SyncPreSnapshotReply); - pMsg->pCont = rpcMallocCont(bytes); - pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY; - pMsg->contLen = bytes; - if (pMsg->pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - SyncPreSnapshotReply* pPreSnapshotReply = pMsg->pCont; - pPreSnapshotReply->bytes = bytes; - pPreSnapshotReply->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY; - pPreSnapshotReply->vgId = vgId; - return 0; -} -#endif - int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { int32_t bytes = sizeof(SyncSnapshotSend) + dataLen; pMsg->pCont = rpcMallocCont(bytes); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 73b6940628..681d256ec9 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -98,7 +98,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { // Get full snapshot info SSyncNode *pSyncNode = pSender->pSyncNode; - SSnapshot snapInfo = {.typ = TAOS_SYNC_SNAP_INFO_FULL}; + SSnapshot snapInfo = {.typ = TDMT_SYNC_PREP_SNAPSHOT}; if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) { sSError(pSender, "snapshot get info failure since %s", terrstr()); goto _out; @@ -106,12 +106,15 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { int dataLen = 0; if (snapInfo.data) { - SMsgHead *msgHead = snapInfo.data; - ASSERT(msgHead->vgId == pSyncNode->vgId); - dataLen = sizeof(SMsgHead) + msgHead->contLen; + SSyncTLV *datHead = snapInfo.data; + if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) { + sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); + terrno = TSDB_CODE_INVALID_DATA_FMT; + goto _out; + } + dataLen = sizeof(SSyncTLV) + datHead->len; } - // build begin msg SRpcMsg rpcMsg = {0}; if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) { sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); @@ -605,7 +608,7 @@ _SEND_REPLY: // build msg ; // make complier happy - SSnapshot snapInfo = {.typ = TAOS_SYNC_SNAP_INFO_DIFF}; + SSnapshot snapInfo = {.typ = TDMT_SYNC_PREP_SNAPSHOT_REPLY}; int32_t dataLen = 0; if (pMsg->dataLen > 0) { void *data = taosMemoryCalloc(1, pMsg->dataLen); @@ -614,15 +617,18 @@ _SEND_REPLY: code = terrno; goto _out; } - dataLen = pMsg->dataLen; - memcpy(data, pMsg->data, dataLen); + memcpy(data, pMsg->data, pMsg->dataLen); snapInfo.data = data; data = NULL; pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo); - SMsgHead *msgHead = snapInfo.data; - ASSERT(msgHead->vgId == pSyncNode->vgId); - dataLen = msgHead->contLen; + SSyncTLV *datHead = snapInfo.data; + if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { + sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); + code = TSDB_CODE_INVALID_DATA_FMT; + goto _out; + } + dataLen = sizeof(SSyncTLV) + datHead->len; } SRpcMsg rpcMsg = {0}; @@ -927,12 +933,17 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend // update sender pSender->snapshot = snapshot; - if (pMsg->payloadType == TAOS_SYNC_SNAP_INFO_DIFF) { - SMsgHead *msgHead = (void *)pMsg->data; - ASSERT(msgHead->vgId == pSyncNode->vgId); + // start reader + if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { + SSyncTLV *datHead = (void *)pMsg->data; + if (datHead->typ != pMsg->payloadType) { + sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ); + terrno = TSDB_CODE_INVALID_DATA_FMT; + return -1; + } pSender->snapshotParam.data = pMsg->data; } - // start reader + int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); if (code != 0) { sSError(pSender, "prepare snapshot failed since %s", terrstr());