feat: use TLV format to encode data of snapshot info
This commit is contained in:
parent
f99795d027
commit
410ced8320
|
@ -299,8 +299,8 @@ enum { // WARN: new msg should be appended to segment tail
|
|||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) // no longer used
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) // no longer used
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT, "sync-prep-snapshot", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PREP_SNAPSHOT_REPLY, "sync-prep-snapshot-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_FORCE_FOLLOWER, "sync-force-become-follower", NULL, NULL)
|
||||
|
||||
|
|
|
@ -101,6 +101,12 @@ typedef struct SNodeInfo {
|
|||
ESyncRole nodeRole;
|
||||
} SNodeInfo;
|
||||
|
||||
typedef struct SSyncTLV {
|
||||
int32_t typ;
|
||||
int32_t len;
|
||||
char val[];
|
||||
} SSyncTLV;
|
||||
|
||||
typedef struct SSyncCfg {
|
||||
int32_t totalReplicaNum;
|
||||
int32_t replicaNum;
|
||||
|
|
|
@ -238,7 +238,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
|
@ -246,7 +246,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
|
|
@ -848,14 +848,14 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_BATCH, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_FORCE_FOLLOWER, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, vmPutMsgToSyncRdQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
|
|
|
@ -681,6 +681,33 @@ int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSn
|
|||
int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
|
||||
void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap);
|
||||
|
||||
// snap partition list
|
||||
typedef TARRAY2(SVersionRange) SVerRangeList;
|
||||
typedef struct STsdbSnapPartition STsdbSnapPartition;
|
||||
typedef TARRAY2(STsdbSnapPartition *) STsdbSnapPartList;
|
||||
// util
|
||||
STsdbSnapPartList *tsdbSnapPartListCreate();
|
||||
void tsdbSnapPartListDestroy(STsdbSnapPartList **ppList);
|
||||
int32_t tSerializeTsdbSnapPartList(void *buf, int32_t bufLen, STsdbSnapPartList *pList);
|
||||
int32_t tDeserializeTsdbSnapPartList(void *buf, int32_t bufLen, STsdbSnapPartList *pList);
|
||||
int32_t tsdbSnapPartListToRangeDiff(STsdbSnapPartList *pList, TSnapRangeArray **ppRanges);
|
||||
|
||||
enum {
|
||||
TSDB_SNAP_RANGE_TYP_HEAD = 0,
|
||||
TSDB_SNAP_RANGE_TYP_DATA,
|
||||
TSDB_SNAP_RANGE_TYP_SMA,
|
||||
TSDB_SNAP_RANGE_TYP_TOMB,
|
||||
TSDB_SNAP_RANGE_TYP_STT,
|
||||
TSDB_SNAP_RANGE_TYP_MAX,
|
||||
};
|
||||
|
||||
struct STsdbSnapPartition {
|
||||
int64_t fid;
|
||||
int8_t stat;
|
||||
SVerRangeList verRanges[TSDB_SNAP_RANGE_TYP_MAX];
|
||||
};
|
||||
|
||||
// snap read
|
||||
struct STsdbReadSnap {
|
||||
SMemTable *pMem;
|
||||
SQueryNode *pNode;
|
||||
|
|
|
@ -81,6 +81,7 @@ struct SSttLvl {
|
|||
|
||||
struct STFileSet {
|
||||
int32_t fid;
|
||||
int8_t stat;
|
||||
STFileObj *farr[TSDB_FTYPE_MAX]; // file array
|
||||
TSttLvlArray lvlArr[1]; // level array
|
||||
};
|
||||
|
|
|
@ -1158,96 +1158,198 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbTSnapRangeCmprFn(STSnapRange* fsr1, STSnapRange* fsr2) {
|
||||
if (fsr1->fid < fsr2->fid) return -1;
|
||||
if (fsr1->fid > fsr2->fid) return 1;
|
||||
// snap part
|
||||
static int32_t tsdbSnapPartCmprFn(STsdbSnapPartition* x, STsdbSnapPartition* y) {
|
||||
if (x->fid < y->fid) return -1;
|
||||
if (x->fid > y->fid) return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbTFileInsertSnapRange(STFile* f, TSnapRangeArray* snapR) {
|
||||
STSnapRange* fsr = taosMemoryCalloc(1, sizeof(*fsr));
|
||||
if (fsr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
fsr->fid = f->fid;
|
||||
fsr->sver = f->minVer;
|
||||
fsr->ever = f->maxVer;
|
||||
|
||||
int32_t code = TARRAY2_SORT_INSERT(snapR, fsr, tsdbTSnapRangeCmprFn);
|
||||
if (code) {
|
||||
taosMemoryFree(fsr);
|
||||
fsr = NULL;
|
||||
}
|
||||
return code;
|
||||
static int32_t tVersionRangeCmprFn(SVersionRange* x, SVersionRange* y) {
|
||||
if (x->minVer < y->minVer) return -1;
|
||||
if (x->minVer > y->minVer) return 1;
|
||||
if (x->maxVer < y->maxVer) return -1;
|
||||
if (x->maxVer > y->maxVer) return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbTFSetInsertSnapRange(STFileSet* fset, TSnapRangeArray* snapR) {
|
||||
STFile tf = {.fid = fset->fid, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
|
||||
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||
if (fset->farr[ftype] == NULL) continue;
|
||||
STFile* f = fset->farr[ftype]->f;
|
||||
tsdbTFileUpdVerRange(&tf, (SVersionRange){.minVer = f->minVer, .maxVer = f->maxVer});
|
||||
STsdbSnapPartition* tsdbSnapPartitionCreate() {
|
||||
STsdbSnapPartition* pSP = taosMemoryCalloc(1, sizeof(STsdbSnapPartition));
|
||||
if (pSP == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < TSDB_SNAP_RANGE_TYP_MAX; i++) {
|
||||
TARRAY2_INIT(&pSP->verRanges[i]);
|
||||
}
|
||||
return pSP;
|
||||
}
|
||||
|
||||
void tsdbSnapPartitionClear(STsdbSnapPartition** ppSP) {
|
||||
if (ppSP == NULL || ppSP[0] == NULL) {
|
||||
return;
|
||||
}
|
||||
for (int32_t i = 0; i < TSDB_SNAP_RANGE_TYP_MAX; i++) {
|
||||
TARRAY2_DESTROY(&ppSP[0]->verRanges[i], NULL);
|
||||
}
|
||||
taosMemoryFree(ppSP[0]);
|
||||
ppSP[0] = NULL;
|
||||
}
|
||||
|
||||
static int32_t tsdbFTypeToSRangeTyp(tsdb_ftype_t ftype) {
|
||||
switch (ftype) {
|
||||
case TSDB_FTYPE_HEAD:
|
||||
return TSDB_SNAP_RANGE_TYP_HEAD;
|
||||
case TSDB_FTYPE_DATA:
|
||||
return TSDB_SNAP_RANGE_TYP_DATA;
|
||||
case TSDB_FTYPE_SMA:
|
||||
return TSDB_SNAP_RANGE_TYP_SMA;
|
||||
case TSDB_FTYPE_TOMB:
|
||||
return TSDB_SNAP_RANGE_TYP_TOMB;
|
||||
case TSDB_FTYPE_STT:
|
||||
return TSDB_SNAP_RANGE_TYP_STT;
|
||||
}
|
||||
return TSDB_SNAP_RANGE_TYP_MAX;
|
||||
}
|
||||
|
||||
static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP) {
|
||||
STsdbSnapPartition* p = tsdbSnapPartitionCreate();
|
||||
if (p == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int32_t typ = 0;
|
||||
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
|
||||
if (fset->farr[ftype] == NULL) continue;
|
||||
typ = tsdbFTypeToSRangeTyp(ftype);
|
||||
ASSERT(typ < TSDB_SNAP_RANGE_TYP_MAX);
|
||||
STFile* f = fset->farr[ftype]->f;
|
||||
SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer};
|
||||
TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
|
||||
}
|
||||
|
||||
typ = TSDB_SNAP_RANGE_TYP_STT;
|
||||
const SSttLvl* lvl;
|
||||
TARRAY2_FOREACH(fset->lvlArr, lvl) {
|
||||
STFileObj* fobj;
|
||||
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
||||
tsdbTFileUpdVerRange(&tf, (SVersionRange){.minVer = fobj->f->minVer, .maxVer = fobj->f->maxVer});
|
||||
STFile* f = fobj->f;
|
||||
SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer};
|
||||
TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
|
||||
}
|
||||
}
|
||||
ppSP[0] = p;
|
||||
return 0;
|
||||
|
||||
int32_t code = tsdbTFileInsertSnapRange(&tf, snapR);
|
||||
if (code) return code;
|
||||
return code;
|
||||
_err:
|
||||
tsdbSnapPartitionClear(&p);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static TSnapRangeArray* tsdbGetSnapRangeArray(STFileSystem* fs) {
|
||||
int32_t code = 0;
|
||||
TSnapRangeArray* snapR = taosMemoryCalloc(1, sizeof(*snapR));
|
||||
if (snapR == NULL) {
|
||||
STsdbSnapPartList* tsdbSnapPartListCreate() {
|
||||
STsdbSnapPartList* pList = taosMemoryCalloc(1, sizeof(STsdbSnapPartList));
|
||||
if (pList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
TARRAY2_INIT(snapR);
|
||||
TARRAY2_INIT(pList);
|
||||
return pList;
|
||||
}
|
||||
|
||||
static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) {
|
||||
STsdbSnapPartList* pList = tsdbSnapPartListCreate();
|
||||
if (pList == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
|
||||
STFileSet* fset;
|
||||
TARRAY2_FOREACH(fs->fSetArr, fset) {
|
||||
code = tsdbTFSetInsertSnapRange(fset, snapR);
|
||||
if (code) break;
|
||||
STsdbSnapPartition* pItem = NULL;
|
||||
if (tsdbTFileSetToSnapPart(fset, &pItem) < 0) {
|
||||
code = -1;
|
||||
break;
|
||||
}
|
||||
ASSERT(pItem != NULL);
|
||||
TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn);
|
||||
}
|
||||
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
|
||||
|
||||
if (code) {
|
||||
TARRAY2_DESTROY(snapR, tsdbTSnapRangeClear);
|
||||
taosMemoryFree(snapR);
|
||||
snapR = NULL;
|
||||
TARRAY2_DESTROY(pList, tsdbSnapPartitionClear);
|
||||
taosMemoryFree(pList);
|
||||
pList = NULL;
|
||||
}
|
||||
return snapR;
|
||||
return pList;
|
||||
}
|
||||
|
||||
int32_t tSerializeSnapRangeArray(void* buf, int32_t bufLen, TSnapRangeArray* pSnapR) {
|
||||
int32_t tTsdbSnapPartListDataLenCalc(STsdbSnapPartList* pList) {
|
||||
int32_t hdrLen = sizeof(int32_t);
|
||||
int32_t datLen = 0;
|
||||
|
||||
int8_t msgVer = 1;
|
||||
int32_t len = TARRAY2_SIZE(pList);
|
||||
hdrLen += sizeof(msgVer);
|
||||
hdrLen += sizeof(len);
|
||||
datLen += hdrLen;
|
||||
|
||||
for (int32_t u = 0; u < len; u++) {
|
||||
STsdbSnapPartition* p = TARRAY2_GET(pList, u);
|
||||
int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX;
|
||||
int32_t uItem = 0;
|
||||
uItem += sizeof(STsdbSnapPartition);
|
||||
uItem += sizeof(typMax);
|
||||
|
||||
for (int32_t i = 0; i < typMax; i++) {
|
||||
int32_t iLen = TARRAY2_SIZE(&p->verRanges[i]);
|
||||
int32_t jItem = 0;
|
||||
jItem += sizeof(SVersionRange);
|
||||
jItem += sizeof(int64_t);
|
||||
uItem += sizeof(iLen) + jItem * iLen;
|
||||
}
|
||||
datLen += uItem;
|
||||
}
|
||||
return datLen;
|
||||
}
|
||||
|
||||
int32_t tSerializeTsdbSnapPartList(void* buf, int32_t bufLen, STsdbSnapPartList* pList) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
int8_t msgVer = 1;
|
||||
int32_t arrLen = TARRAY2_SIZE(pSnapR);
|
||||
int8_t reserved8 = 0;
|
||||
int16_t reserved16 = 0;
|
||||
int64_t reserved64 = 0;
|
||||
|
||||
int8_t msgVer = 1;
|
||||
int32_t len = TARRAY2_SIZE(pList);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) goto _err;
|
||||
if (tEncodeI8(&encoder, msgVer) < 0) goto _err;
|
||||
if (tEncodeI8(&encoder, reserved8) < 0) goto _err;
|
||||
if (tEncodeI32(&encoder, arrLen) < 0) goto _err;
|
||||
if (tEncodeI32(&encoder, len) < 0) goto _err;
|
||||
|
||||
int64_t reserved64 = 0;
|
||||
for (int32_t i = 0; i < arrLen; i++) {
|
||||
STSnapRange* u = TARRAY2_GET(pSnapR, i);
|
||||
int64_t fid = u->fid;
|
||||
if (tEncodeI64(&encoder, fid) < 0) goto _err;
|
||||
if (tEncodeI64(&encoder, u->sver) < 0) goto _err;
|
||||
if (tEncodeI64(&encoder, u->ever) < 0) goto _err;
|
||||
if (tEncodeI64(&encoder, reserved64) < 0) goto _err;
|
||||
for (int32_t u = 0; u < len; u++) {
|
||||
STsdbSnapPartition* p = TARRAY2_GET(pList, u);
|
||||
if (tEncodeI64(&encoder, p->fid) < 0) goto _err;
|
||||
if (tEncodeI8(&encoder, p->stat) < 0) goto _err;
|
||||
if (tEncodeI8(&encoder, reserved8) < 0) goto _err;
|
||||
if (tEncodeI16(&encoder, reserved16) < 0) goto _err;
|
||||
|
||||
int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX;
|
||||
if (tEncodeI32(&encoder, typMax) < 0) goto _err;
|
||||
|
||||
for (int32_t i = 0; i < typMax; i++) {
|
||||
SVerRangeList* iList = &p->verRanges[i];
|
||||
int32_t iLen = TARRAY2_SIZE(iList);
|
||||
|
||||
if (tEncodeI32(&encoder, iLen) < 0) goto _err;
|
||||
for (int32_t j = 0; j < iLen; j++) {
|
||||
SVersionRange r = TARRAY2_GET(iList, j);
|
||||
if (tEncodeI64(&encoder, r.minVer) < 0) goto _err;
|
||||
if (tEncodeI64(&encoder, r.maxVer) < 0) goto _err;
|
||||
if (tEncodeI64(&encoder, reserved64) < 0) goto _err;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
@ -1260,30 +1362,47 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSnapRangeArray(void* buf, int32_t bufLen, TSnapRangeArray* pSnapR) {
|
||||
int32_t tDeserializeTsdbSnapPartList(void* buf, int32_t bufLen, STsdbSnapPartList* pList) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
int8_t msgVer = 0;
|
||||
int32_t arrLen = 0;
|
||||
int8_t reserved8 = 0;
|
||||
int16_t reserved16 = 0;
|
||||
int64_t reserved64 = 0;
|
||||
|
||||
STsdbSnapPartition* p = NULL;
|
||||
|
||||
int8_t msgVer = 0;
|
||||
int32_t len = 0;
|
||||
if (tStartDecode(&decoder) < 0) goto _err;
|
||||
if (tDecodeI8(&decoder, &msgVer) < 0) goto _err;
|
||||
if (tDecodeI8(&decoder, &reserved8) < 0) goto _err;
|
||||
if (tDecodeI32(&decoder, &arrLen) < 0) goto _err;
|
||||
if (tDecodeI32(&decoder, &len) < 0) goto _err;
|
||||
|
||||
int64_t fid = 0;
|
||||
int64_t reserved64 = 0;
|
||||
STSnapRange* pRange = NULL;
|
||||
for (int32_t i = 0; i < arrLen; i++) {
|
||||
pRange = taosMemoryCalloc(1, sizeof(STSnapRange));
|
||||
if (tDecodeI64(&decoder, &fid) < 0) goto _err;
|
||||
pRange->fid = fid;
|
||||
if (tDecodeI64(&decoder, &pRange->sver) < 0) goto _err;
|
||||
if (tDecodeI64(&decoder, &pRange->ever) < 0) goto _err;
|
||||
if (tDecodeI64(&decoder, &reserved64) < 0) goto _err;
|
||||
TARRAY2_APPEND(pSnapR, pRange);
|
||||
pRange = NULL;
|
||||
for (int32_t u = 0; u < len; u++) {
|
||||
p = tsdbSnapPartitionCreate();
|
||||
if (p == NULL) goto _err;
|
||||
if (tDecodeI64(&decoder, &p->fid) < 0) goto _err;
|
||||
if (tDecodeI8(&decoder, &p->stat) < 0) goto _err;
|
||||
if (tDecodeI8(&decoder, &reserved8) < 0) goto _err;
|
||||
if (tDecodeI16(&decoder, &reserved16) < 0) goto _err;
|
||||
|
||||
int32_t typMax = 0;
|
||||
if (tDecodeI32(&decoder, &typMax) < 0) goto _err;
|
||||
|
||||
for (int32_t i = 0; i < typMax; i++) {
|
||||
SVerRangeList* iList = &p->verRanges[i];
|
||||
int32_t iLen = 0;
|
||||
if (tDecodeI32(&decoder, &iLen) < 0) goto _err;
|
||||
for (int32_t j = 0; j < iLen; j++) {
|
||||
SVersionRange r = {0};
|
||||
if (tDecodeI64(&decoder, &r.minVer) < 0) goto _err;
|
||||
if (tDecodeI64(&decoder, &r.maxVer) < 0) goto _err;
|
||||
if (tDecodeI64(&decoder, &reserved64) < 0) goto _err;
|
||||
TARRAY2_APPEND(iList, r);
|
||||
}
|
||||
}
|
||||
TARRAY2_APPEND(pList, p);
|
||||
p = NULL;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
@ -1291,14 +1410,52 @@ int32_t tDeserializeSnapRangeArray(void* buf, int32_t bufLen, TSnapRangeArray* p
|
|||
return 0;
|
||||
|
||||
_err:
|
||||
if (pRange) {
|
||||
taosMemoryFree(pRange);
|
||||
pRange = NULL;
|
||||
if (p) {
|
||||
tsdbSnapPartitionClear(&p);
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapPartListToRangeDiff(STsdbSnapPartList* pList, TSnapRangeArray** ppRanges) {
|
||||
TSnapRangeArray* pDiff = taosMemoryCalloc(1, sizeof(TSnapRangeArray));
|
||||
if (pDiff == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
TARRAY2_INIT(pDiff);
|
||||
|
||||
STsdbSnapPartition* part;
|
||||
TARRAY2_FOREACH(pList, part) {
|
||||
STSnapRange* r = taosMemoryCalloc(1, sizeof(STSnapRange));
|
||||
if (r == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
int64_t ever = -1;
|
||||
int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX;
|
||||
for (int32_t i = 0; i < typMax; i++) {
|
||||
SVerRangeList* iList = &part->verRanges[i];
|
||||
SVersionRange r = {0};
|
||||
TARRAY2_FOREACH(iList, r) {
|
||||
if (r.maxVer < r.minVer) continue;
|
||||
ever = TMAX(ever, r.maxVer);
|
||||
}
|
||||
}
|
||||
r->sver = ever + 1;
|
||||
r->ever = VERSION_MAX;
|
||||
TARRAY2_APPEND(pDiff, r);
|
||||
}
|
||||
ppRanges[0] = pDiff;
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
if (pDiff) {
|
||||
tsdbSnapRangeArrayDestroy(&pDiff);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
void tsdbSnapRangeArrayDestroy(TSnapRangeArray** ppSnap) {
|
||||
if (ppSnap && ppSnap[0]) {
|
||||
TARRAY2_DESTROY(ppSnap[0], tsdbTSnapRangeClear);
|
||||
|
@ -1307,53 +1464,64 @@ void tsdbSnapRangeArrayDestroy(TSnapRangeArray** ppSnap) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t tsdbSnapInfoDataLenCalc(TSnapRangeArray* pSnap) {
|
||||
int32_t headerLen = 8;
|
||||
int32_t itemLen = sizeof(STSnapRange) + 8;
|
||||
int32_t size = TARRAY2_SIZE(pSnap);
|
||||
return headerLen + itemLen * size;
|
||||
void tsdbSnapPartListDestroy(STsdbSnapPartList** ppList) {
|
||||
if (ppList == NULL || ppList[0] == NULL) return;
|
||||
|
||||
TARRAY2_DESTROY(ppList[0], tsdbSnapPartitionClear);
|
||||
taosMemoryFree(ppList[0]);
|
||||
ppList[0] = NULL;
|
||||
}
|
||||
|
||||
int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap) {
|
||||
int32_t code = 0;
|
||||
if (pSnap->typ == TAOS_SYNC_SNAP_INFO_BRIEF) {
|
||||
if (pSnap->typ != TDMT_SYNC_PREP_SNAPSHOT && pSnap->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||
return 0;
|
||||
}
|
||||
code = -1;
|
||||
TSnapRangeArray* snapR = tsdbGetSnapRangeArray(pTsdb->pFS);
|
||||
if (snapR == NULL) {
|
||||
goto _out;
|
||||
}
|
||||
if (pSnap->typ == TAOS_SYNC_SNAP_INFO_DIFF) {
|
||||
for (int32_t i = 0; i < TARRAY2_SIZE(snapR); i++) {
|
||||
STSnapRange* u = TARRAY2_GET(snapR, i);
|
||||
u->sver = u->ever + 1;
|
||||
u->ever = VERSION_MAX;
|
||||
}
|
||||
int code = -1;
|
||||
STsdbSnapPartList* pList = tsdbGetSnapPartList(pTsdb->pFS);
|
||||
if (pList == NULL) goto _out;
|
||||
|
||||
if (pSnap->typ == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||
}
|
||||
|
||||
int32_t bufLen = sizeof(SMsgHead) + tsdbSnapInfoDataLenCalc(snapR);
|
||||
void* data = taosMemoryRealloc(pSnap->data, bufLen);
|
||||
void* buf = NULL;
|
||||
int32_t tlen = 0;
|
||||
// estimate data length encode
|
||||
int32_t bufLen = sizeof(SSyncTLV); // typ: TDMT_SYNC_PREP_SNAPSHOT or TDMT_SYNC_PREP_SNAPSOT_REPLY
|
||||
bufLen += sizeof(SSyncTLV); // subtyp: SNAP_DATA_TSDB
|
||||
bufLen += tTsdbSnapPartListDataLenCalc(pList);
|
||||
|
||||
void* data = taosMemoryRealloc(pSnap->data, bufLen);
|
||||
if (data == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _out;
|
||||
}
|
||||
pSnap->data = data;
|
||||
void* buf = ((char*)data) + sizeof(SMsgHead);
|
||||
int32_t tlen = 0;
|
||||
|
||||
if ((tlen = tSerializeSnapRangeArray(buf, bufLen, snapR)) < 0) {
|
||||
// header
|
||||
SSyncTLV* datHead = (void*)pSnap->data;
|
||||
datHead->typ = pSnap->typ;
|
||||
datHead->len = 0;
|
||||
|
||||
// tsdb
|
||||
SSyncTLV* tsdbHead = (void*)datHead->val;
|
||||
tsdbHead->typ = SNAP_DATA_TSDB;
|
||||
|
||||
buf = tsdbHead->val;
|
||||
tlen = 0;
|
||||
if ((tlen = tSerializeTsdbSnapPartList(buf, bufLen, pList)) < 0) {
|
||||
tsdbError("vgId:%d, failed to serialize snap range since %s", TD_VID(pTsdb->pVnode), terrstr());
|
||||
goto _out;
|
||||
}
|
||||
SMsgHead* msgHead = pSnap->data;
|
||||
msgHead->contLen = tlen;
|
||||
msgHead->vgId = TD_VID(pTsdb->pVnode);
|
||||
tsdbHead->len = tlen;
|
||||
datHead->len += sizeof(SSyncTLV) + tsdbHead->len;
|
||||
|
||||
// rsma
|
||||
|
||||
code = 0;
|
||||
|
||||
_out:
|
||||
if (snapR) {
|
||||
tsdbSnapRangeArrayDestroy(&snapR);
|
||||
if (pList) {
|
||||
tsdbSnapPartListDestroy(&pList);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -45,6 +45,7 @@ struct SVSnapReader {
|
|||
SStreamStateReader *pStreamStateReader;
|
||||
// rsma
|
||||
int8_t rsmaDone;
|
||||
TSnapRangeArray *pRsmaRanges;
|
||||
SRSmaSnapReader *pRsmaReader;
|
||||
};
|
||||
|
||||
|
@ -64,19 +65,44 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader
|
|||
pReader->ever = ever;
|
||||
|
||||
if (pParam->data) {
|
||||
pReader->pRanges = taosMemoryCalloc(1, sizeof(*pReader->pRanges));
|
||||
if (pReader->pRanges == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
SSyncTLV *datHead = (void *)pParam->data;
|
||||
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
goto _err;
|
||||
}
|
||||
TARRAY2_INIT(pReader->pRanges);
|
||||
SMsgHead *msgHead = pParam->data;
|
||||
ASSERT(msgHead->vgId == TD_VID(pVnode));
|
||||
void *buf = (char *)pParam->data + sizeof(SMsgHead);
|
||||
|
||||
if (tDeserializeSnapRangeArray(buf, msgHead->contLen, pReader->pRanges) < 0) {
|
||||
vError("vgId:%d, failed to deserialize snap range.", TD_VID(pVnode));
|
||||
goto _err;
|
||||
int32_t offset = 0;
|
||||
while (offset + sizeof(SSyncTLV) < datHead->len) {
|
||||
SSyncTLV *sectHead = (void *)(datHead->val + offset);
|
||||
offset += sizeof(SSyncTLV) + sectHead->len;
|
||||
void *buf = sectHead->val;
|
||||
int32_t bufLen = sectHead->len;
|
||||
ASSERT(sectHead->typ == SNAP_DATA_TSDB || sectHead->typ == SNAP_DATA_RSMA1);
|
||||
STsdbSnapPartList *pList = tsdbSnapPartListCreate();
|
||||
if (pList == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
if (tDeserializeTsdbSnapPartList(buf, bufLen, pList) < 0) {
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
goto _err;
|
||||
}
|
||||
TSnapRangeArray **ppRanges = NULL;
|
||||
if (sectHead->typ == SNAP_DATA_TSDB) {
|
||||
ppRanges = &pReader->pRanges;
|
||||
} else if (sectHead->typ == SNAP_DATA_RSMA1) {
|
||||
ppRanges = &pReader->pRsmaRanges;
|
||||
}
|
||||
if (ppRanges == NULL) {
|
||||
tsdbSnapPartListDestroy(&pList);
|
||||
continue;
|
||||
}
|
||||
if (tsdbSnapPartListToRangeDiff(pList, ppRanges) < 0) {
|
||||
vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr());
|
||||
tsdbSnapPartListDestroy(&pList);
|
||||
goto _err;
|
||||
}
|
||||
tsdbSnapPartListDestroy(&pList);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -216,42 +216,6 @@ int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) {
|
||||
int32_t bytes = sizeof(SyncPreSnapshot);
|
||||
pMsg->pCont = rpcMallocCont(bytes);
|
||||
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT;
|
||||
pMsg->contLen = bytes;
|
||||
if (pMsg->pCont == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SyncPreSnapshot* pPreSnapshot = pMsg->pCont;
|
||||
pPreSnapshot->bytes = bytes;
|
||||
pPreSnapshot->msgType = TDMT_SYNC_PRE_SNAPSHOT;
|
||||
pPreSnapshot->vgId = vgId;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||
int32_t bytes = sizeof(SyncPreSnapshotReply);
|
||||
pMsg->pCont = rpcMallocCont(bytes);
|
||||
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
|
||||
pMsg->contLen = bytes;
|
||||
if (pMsg->pCont == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SyncPreSnapshotReply* pPreSnapshotReply = pMsg->pCont;
|
||||
pPreSnapshotReply->bytes = bytes;
|
||||
pPreSnapshotReply->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
|
||||
pPreSnapshotReply->vgId = vgId;
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
|
||||
int32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
|
||||
pMsg->pCont = rpcMallocCont(bytes);
|
||||
|
|
|
@ -98,7 +98,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
|
||||
// Get full snapshot info
|
||||
SSyncNode *pSyncNode = pSender->pSyncNode;
|
||||
SSnapshot snapInfo = {.typ = TAOS_SYNC_SNAP_INFO_FULL};
|
||||
SSnapshot snapInfo = {.typ = TDMT_SYNC_PREP_SNAPSHOT};
|
||||
if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) {
|
||||
sSError(pSender, "snapshot get info failure since %s", terrstr());
|
||||
goto _out;
|
||||
|
@ -106,12 +106,15 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
|||
|
||||
int dataLen = 0;
|
||||
if (snapInfo.data) {
|
||||
SMsgHead *msgHead = snapInfo.data;
|
||||
ASSERT(msgHead->vgId == pSyncNode->vgId);
|
||||
dataLen = sizeof(SMsgHead) + msgHead->contLen;
|
||||
SSyncTLV *datHead = snapInfo.data;
|
||||
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
|
||||
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
goto _out;
|
||||
}
|
||||
dataLen = sizeof(SSyncTLV) + datHead->len;
|
||||
}
|
||||
|
||||
// build begin msg
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
|
||||
|
@ -605,7 +608,7 @@ _SEND_REPLY:
|
|||
// build msg
|
||||
; // make complier happy
|
||||
|
||||
SSnapshot snapInfo = {.typ = TAOS_SYNC_SNAP_INFO_DIFF};
|
||||
SSnapshot snapInfo = {.typ = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
|
||||
int32_t dataLen = 0;
|
||||
if (pMsg->dataLen > 0) {
|
||||
void *data = taosMemoryCalloc(1, pMsg->dataLen);
|
||||
|
@ -614,15 +617,18 @@ _SEND_REPLY:
|
|||
code = terrno;
|
||||
goto _out;
|
||||
}
|
||||
dataLen = pMsg->dataLen;
|
||||
memcpy(data, pMsg->data, dataLen);
|
||||
memcpy(data, pMsg->data, pMsg->dataLen);
|
||||
snapInfo.data = data;
|
||||
data = NULL;
|
||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo);
|
||||
|
||||
SMsgHead *msgHead = snapInfo.data;
|
||||
ASSERT(msgHead->vgId == pSyncNode->vgId);
|
||||
dataLen = msgHead->contLen;
|
||||
SSyncTLV *datHead = snapInfo.data;
|
||||
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||
sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
|
||||
code = TSDB_CODE_INVALID_DATA_FMT;
|
||||
goto _out;
|
||||
}
|
||||
dataLen = sizeof(SSyncTLV) + datHead->len;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
|
@ -927,12 +933,17 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
|
|||
// update sender
|
||||
pSender->snapshot = snapshot;
|
||||
|
||||
if (pMsg->payloadType == TAOS_SYNC_SNAP_INFO_DIFF) {
|
||||
SMsgHead *msgHead = (void *)pMsg->data;
|
||||
ASSERT(msgHead->vgId == pSyncNode->vgId);
|
||||
// start reader
|
||||
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
|
||||
SSyncTLV *datHead = (void *)pMsg->data;
|
||||
if (datHead->typ != pMsg->payloadType) {
|
||||
sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ);
|
||||
terrno = TSDB_CODE_INVALID_DATA_FMT;
|
||||
return -1;
|
||||
}
|
||||
pSender->snapshotParam.data = pMsg->data;
|
||||
}
|
||||
// start reader
|
||||
|
||||
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
|
||||
if (code != 0) {
|
||||
sSError(pSender, "prepare snapshot failed since %s", terrstr());
|
||||
|
|
Loading…
Reference in New Issue