From 083dd148be6e7fe9569fbb9ae82f3d24375b1ef6 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 11 Sep 2023 19:05:40 +0800 Subject: [PATCH] feat: exchange difference of snapshot info for replication --- include/libs/sync/sync.h | 12 +++- source/dnode/mnode/impl/src/mndSync.c | 3 +- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 8 ++- source/dnode/vnode/src/vnd/vnodeSync.c | 5 +- source/libs/sync/inc/syncMessage.h | 7 ++- source/libs/sync/src/syncMessage.c | 4 +- source/libs/sync/src/syncSnapshot.c | 82 ++++++++++++++++++++++---- 8 files changed, 100 insertions(+), 23 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index f69afbd71b..53e6ec0d71 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -87,6 +87,12 @@ typedef enum { TAOS_SYNC_ROLE_ERROR = 2, } ESyncRole; +typedef enum { + TAOS_SYNC_SNAP_INFO_BRIEF = 0, + TAOS_SYNC_SNAP_INFO_FULL = 1, + TAOS_SYNC_SNAP_INFO_DIFF = 2, +} ESyncSnapInfoTyp; + typedef struct SNodeInfo { int64_t clusterId; int32_t nodeId; @@ -139,10 +145,12 @@ typedef struct SReConfigCbMeta { typedef struct SSnapshotParam { SyncIndex start; SyncIndex end; + void* data; // with SMsgHead } SSnapshotParam; typedef struct SSnapshot { - void* data; + ESyncSnapInfoTyp typ; + void* data; // with SMsgHead SyncIndex lastApplyIndex; SyncTerm lastApplyTerm; SyncIndex lastConfigIndex; @@ -171,7 +179,7 @@ typedef struct SSyncFSM { void (*FpBecomeLearnerCb)(const struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); - void (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot); + int32_t (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); void (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 5759737a6a..7f6a0397ad 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -286,9 +286,10 @@ int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pRe return 0; } -static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { +static int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { SMnode *pMnode = pFsm->data; sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); + return 0; } void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index c40e2657f9..ba7bad67e4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -69,7 +69,7 @@ int32_t vnodeBegin(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); int64_t vnodeGetSyncHandle(SVnode *pVnode); -void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); +int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t *numOfTables, int64_t *numOfNormalTables); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeGetTableList(void *pVnode, int8_t type, SArray *pList); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index db94f32459..9228269992 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -518,9 +518,13 @@ void vnodeStop(SVnode *pVnode) {} int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } -void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { - pSnapshot->data = NULL; +int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; pSnapshot->lastApplyTerm = pVnode->state.commitTerm; pSnapshot->lastConfigIndex = -1; + + if (pSnapshot->typ == TAOS_SYNC_SNAP_INFO_FULL) { + // TODO: get full info of snapshots + } + return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index a6c743c87d..b9f2d23c7b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -416,8 +416,8 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { - vnodeGetSnapshot(pFsm->data, pSnapshot); +static int32_t vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { + return vnodeGetSnapshot(pFsm->data, pSnapshot); } static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { @@ -642,6 +642,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; + pFsm->FpGetSnapshot = NULL; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo; pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpLeaderTransferCb = NULL; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index f8c96d8be2..c0d3663a8f 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -200,7 +200,7 @@ typedef struct SyncSnapshotSend { SSyncCfg lastConfig; int64_t startTime; int32_t seq; - int16_t reserved; + int16_t payloadType; uint32_t dataLen; char data[]; } SyncSnapshotSend; @@ -219,7 +219,8 @@ typedef struct SyncSnapshotRsp { int32_t ack; int32_t code; SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid - int16_t reserved; + int16_t payloadType; + char data[]; } SyncSnapshotRsp; typedef struct SyncLeaderTransfer { @@ -267,7 +268,7 @@ int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta); int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); -int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId); +int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 72c8887803..00ca6d8f90 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -270,8 +270,8 @@ int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { return 0; } -int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId) { - int32_t bytes = sizeof(SyncSnapshotRsp); +int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { + int32_t bytes = sizeof(SyncSnapshotRsp) + dataLen; pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP; pMsg->contLen = bytes; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 763d4ec5d6..00dcd7e949 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -74,6 +74,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { + int32_t code = -1; pSender->start = true; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; @@ -95,11 +96,26 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->lastSendTime = pSender->startTime; pSender->finish = false; + // Get full snapshot info + SSyncNode *pSyncNode = pSender->pSyncNode; + SSnapshot snapInfo = {.typ = TAOS_SYNC_SNAP_INFO_FULL}; + if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) { + sSError(pSender, "snapshot get info failure since %s", terrstr()); + goto _out; + } + + int dataLen = 0; + if (snapInfo.data) { + SMsgHead *msgHead = snapInfo.data; + ASSERT(msgHead->vgId == pSyncNode->vgId); + dataLen = sizeof(SMsgHead) + msgHead->contLen; + } + // build begin msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) { + if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) { sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); - return -1; + goto _out; } SyncSnapshotSend *pMsg = rpcMsg.pCont; @@ -114,16 +130,27 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pMsg->startTime = pSender->startTime; pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; + if (dataLen > 0) { + pMsg->payloadType = snapInfo.typ; + memcpy(pMsg->data, snapInfo.data, dataLen); + } + // event log syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start"); // send msg if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); - return -1; + goto _out; } - return 0; + code = 0; +_out: + if (snapInfo.data) { + taosMemoryFree(snapInfo.data); + snapInfo.data = NULL; + } + return code; } void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { @@ -578,10 +605,29 @@ _SEND_REPLY: // build msg ; // make complier happy + code = -1; + SSnapshot snapInfo = {.typ = TAOS_SYNC_SNAP_INFO_DIFF}; + int32_t dataLen = 0; + if (pMsg->dataLen > 0) { + void *data = taosMemoryCalloc(1, pMsg->dataLen); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _out; + } + memcpy(data, pMsg->data, dataLen); + snapInfo.data = data; + data = NULL; + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo); + + SMsgHead *msgHead = snapInfo.data; + ASSERT(msgHead->vgId == pSyncNode->vgId); + dataLen = msgHead->contLen; + } + SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) { + if (syncBuildSnapshotSendRsp(&rpcMsg, dataLen, pSyncNode->vgId) != 0) { sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr()); - return -1; + goto _out; } SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; @@ -595,13 +641,24 @@ _SEND_REPLY: pRspMsg->code = code; pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode); + if (snapInfo.data) { + pRspMsg->payloadType = snapInfo.typ; + memcpy(pRspMsg->data, snapInfo.data, dataLen); + } + // send msg syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot"); if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr()); - return -1; + goto _out; } + code = 0; +_out: + if (snapInfo.data) { + taosMemoryFree(snapInfo.data); + snapInfo.data = NULL; + } return code; } @@ -635,7 +692,7 @@ _SEND_REPLY: // build msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) { + if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId) != 0) { sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr()); return -1; } @@ -685,7 +742,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend // build msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) { + if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) { sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr()); return -1; } @@ -732,7 +789,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs // build msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) { + if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId) != 0) { sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr()); return -1; } @@ -869,6 +926,11 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend // update sender pSender->snapshot = snapshot; + if (pMsg->payloadType == TAOS_SYNC_SNAP_INFO_DIFF) { + SMsgHead *msgHead = (void *)pMsg->data; + ASSERT(msgHead->vgId == pSyncNode->vgId); + pSender->snapshotParam.data = pMsg->data; + } // start reader int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); if (code != 0) {