feat: exchange difference of snapshot info for replication

This commit is contained in:
Benguang Zhao 2023-09-11 19:05:40 +08:00
parent b23bcee690
commit 083dd148be
8 changed files with 100 additions and 23 deletions

View File

@ -87,6 +87,12 @@ typedef enum {
TAOS_SYNC_ROLE_ERROR = 2, TAOS_SYNC_ROLE_ERROR = 2,
} ESyncRole; } ESyncRole;
typedef enum {
TAOS_SYNC_SNAP_INFO_BRIEF = 0,
TAOS_SYNC_SNAP_INFO_FULL = 1,
TAOS_SYNC_SNAP_INFO_DIFF = 2,
} ESyncSnapInfoTyp;
typedef struct SNodeInfo { typedef struct SNodeInfo {
int64_t clusterId; int64_t clusterId;
int32_t nodeId; int32_t nodeId;
@ -139,10 +145,12 @@ typedef struct SReConfigCbMeta {
typedef struct SSnapshotParam { typedef struct SSnapshotParam {
SyncIndex start; SyncIndex start;
SyncIndex end; SyncIndex end;
void* data; // with SMsgHead
} SSnapshotParam; } SSnapshotParam;
typedef struct SSnapshot { typedef struct SSnapshot {
void* data; ESyncSnapInfoTyp typ;
void* data; // with SMsgHead
SyncIndex lastApplyIndex; SyncIndex lastApplyIndex;
SyncTerm lastApplyTerm; SyncTerm lastApplyTerm;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
@ -171,7 +179,7 @@ typedef struct SSyncFSM {
void (*FpBecomeLearnerCb)(const struct SSyncFSM* pFsm); void (*FpBecomeLearnerCb)(const struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
void (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
void (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader); void (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader);

View File

@ -286,9 +286,10 @@ int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pRe
return 0; return 0;
} }
static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { static int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
return 0;
} }
void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {

View File

@ -69,7 +69,7 @@ int32_t vnodeBegin(SVnode *pVnode);
int32_t vnodeStart(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode); void vnodeStop(SVnode *pVnode);
int64_t vnodeGetSyncHandle(SVnode *pVnode); int64_t vnodeGetSyncHandle(SVnode *pVnode);
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t *numOfTables, int64_t *numOfNormalTables); void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t *numOfTables, int64_t *numOfNormalTables);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetTableList(void *pVnode, int8_t type, SArray *pList); int32_t vnodeGetTableList(void *pVnode, int8_t type, SArray *pList);

View File

@ -518,9 +518,13 @@ void vnodeStop(SVnode *pVnode) {}
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) {
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = pVnode->state.committed; pSnapshot->lastApplyIndex = pVnode->state.committed;
pSnapshot->lastApplyTerm = pVnode->state.commitTerm; pSnapshot->lastApplyTerm = pVnode->state.commitTerm;
pSnapshot->lastConfigIndex = -1; pSnapshot->lastConfigIndex = -1;
if (pSnapshot->typ == TAOS_SYNC_SNAP_INFO_FULL) {
// TODO: get full info of snapshots
}
return 0;
} }

View File

@ -416,8 +416,8 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code; return code;
} }
static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
vnodeGetSnapshot(pFsm->data, pSnapshot); return vnodeGetSnapshot(pFsm->data, pSnapshot);
} }
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
@ -642,6 +642,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex; pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = NULL;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
pFsm->FpLeaderTransferCb = NULL; pFsm->FpLeaderTransferCb = NULL;

View File

@ -200,7 +200,7 @@ typedef struct SyncSnapshotSend {
SSyncCfg lastConfig; SSyncCfg lastConfig;
int64_t startTime; int64_t startTime;
int32_t seq; int32_t seq;
int16_t reserved; int16_t payloadType;
uint32_t dataLen; uint32_t dataLen;
char data[]; char data[];
} SyncSnapshotSend; } SyncSnapshotSend;
@ -219,7 +219,8 @@ typedef struct SyncSnapshotRsp {
int32_t ack; int32_t ack;
int32_t code; int32_t code;
SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid
int16_t reserved; int16_t payloadType;
char data[];
} SyncSnapshotRsp; } SyncSnapshotRsp;
typedef struct SyncLeaderTransfer { typedef struct SyncLeaderTransfer {
@ -267,7 +268,7 @@ int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta); int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta);
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId);
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId);

View File

@ -270,8 +270,8 @@ int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
return 0; return 0;
} }
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId) { int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
int32_t bytes = sizeof(SyncSnapshotRsp); int32_t bytes = sizeof(SyncSnapshotRsp) + dataLen;
pMsg->pCont = rpcMallocCont(bytes); pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP; pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
pMsg->contLen = bytes; pMsg->contLen = bytes;

View File

@ -74,6 +74,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
int32_t code = -1;
pSender->start = true; pSender->start = true;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
@ -95,11 +96,26 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->lastSendTime = pSender->startTime; pSender->lastSendTime = pSender->startTime;
pSender->finish = false; pSender->finish = false;
// Get full snapshot info
SSyncNode *pSyncNode = pSender->pSyncNode;
SSnapshot snapInfo = {.typ = TAOS_SYNC_SNAP_INFO_FULL};
if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) {
sSError(pSender, "snapshot get info failure since %s", terrstr());
goto _out;
}
int dataLen = 0;
if (snapInfo.data) {
SMsgHead *msgHead = snapInfo.data;
ASSERT(msgHead->vgId == pSyncNode->vgId);
dataLen = sizeof(SMsgHead) + msgHead->contLen;
}
// build begin msg // build begin msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) { if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
return -1; goto _out;
} }
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
@ -114,16 +130,27 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->startTime = pSender->startTime; pMsg->startTime = pSender->startTime;
pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
if (dataLen > 0) {
pMsg->payloadType = snapInfo.typ;
memcpy(pMsg->data, snapInfo.data, dataLen);
}
// event log // event log
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start"); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
// send msg // send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1; goto _out;
} }
return 0; code = 0;
_out:
if (snapInfo.data) {
taosMemoryFree(snapInfo.data);
snapInfo.data = NULL;
}
return code;
} }
void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
@ -578,10 +605,29 @@ _SEND_REPLY:
// build msg // build msg
; // make complier happy ; // make complier happy
code = -1;
SSnapshot snapInfo = {.typ = TAOS_SYNC_SNAP_INFO_DIFF};
int32_t dataLen = 0;
if (pMsg->dataLen > 0) {
void *data = taosMemoryCalloc(1, pMsg->dataLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _out;
}
memcpy(data, pMsg->data, dataLen);
snapInfo.data = data;
data = NULL;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo);
SMsgHead *msgHead = snapInfo.data;
ASSERT(msgHead->vgId == pSyncNode->vgId);
dataLen = msgHead->contLen;
}
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) { if (syncBuildSnapshotSendRsp(&rpcMsg, dataLen, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr()); sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
return -1; goto _out;
} }
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
@ -595,13 +641,24 @@ _SEND_REPLY:
pRspMsg->code = code; pRspMsg->code = code;
pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode); pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
if (snapInfo.data) {
pRspMsg->payloadType = snapInfo.typ;
memcpy(pRspMsg->data, snapInfo.data, dataLen);
}
// send msg // send msg
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot"); syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr()); sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
return -1; goto _out;
} }
code = 0;
_out:
if (snapInfo.data) {
taosMemoryFree(snapInfo.data);
snapInfo.data = NULL;
}
return code; return code;
} }
@ -635,7 +692,7 @@ _SEND_REPLY:
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) { if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr()); sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
return -1; return -1;
} }
@ -685,7 +742,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) { if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) {
sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr()); sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
return -1; return -1;
} }
@ -732,7 +789,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) { if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr()); sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
return -1; return -1;
} }
@ -869,6 +926,11 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
// update sender // update sender
pSender->snapshot = snapshot; pSender->snapshot = snapshot;
if (pMsg->payloadType == TAOS_SYNC_SNAP_INFO_DIFF) {
SMsgHead *msgHead = (void *)pMsg->data;
ASSERT(msgHead->vgId == pSyncNode->vgId);
pSender->snapshotParam.data = pMsg->data;
}
// start reader // start reader
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
if (code != 0) { if (code != 0) {