From 3221aef1a350c0ae40d277ba7a475ad584e8d0a0 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 1 Nov 2023 20:21:23 +0800 Subject: [PATCH] refact: improve code with syncSnapSendMsg --- source/libs/sync/src/syncSnapshot.c | 133 +++++++++++----------------- 1 file changed, 51 insertions(+), 82 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index dd5449611b..dbd3ad54ed 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -23,6 +23,8 @@ #include "syncReplication.h" #include "syncUtil.h" +int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ); + static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { taosThreadMutexLock(&pBuf->mutex); for (int64_t i = pBuf->start; i < pBuf->end; ++i) { @@ -160,8 +162,11 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { } int dataLen = 0; - if (snapInfo.data) { - SSyncTLV *datHead = snapInfo.data; + void *pData = snapInfo.data; + int32_t type = 0; + if (pData) { + type = snapInfo.type; + SSyncTLV *datHead = pData; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) { sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); terrno = TSDB_CODE_INVALID_DATA_FMT; @@ -170,37 +175,12 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { dataLen = sizeof(SSyncTLV) + datHead->len; } - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) { - sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); + if (syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type) != 0) { goto _out; } - SyncSnapshotSend *pMsg = rpcMsg.pCont; - pMsg->srcId = pSender->pSyncNode->myRaftId; - pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = pSender->term; - pMsg->beginIndex = pSender->snapshotParam.start; - pMsg->lastIndex = pSender->snapshot.lastApplyIndex; - pMsg->lastTerm = pSender->snapshot.lastApplyTerm; - pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; - pMsg->lastConfig = pSender->lastConfig; - pMsg->startTime = pSender->startTime; - pMsg->seq = pSender->seq; - - if (dataLen > 0) { - pMsg->payloadType = snapInfo.type; - memcpy(pMsg->data, snapInfo.data, dataLen); - } - - // send msg - if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { - sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); - goto _out; - } - - sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId)); - + SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; + sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId)); code = 0; _out: if (snapInfo.data) { @@ -232,6 +212,43 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish); } +int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) { + int32_t code = -1; + SRpcMsg rpcMsg = {0}; + + if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) { + sSError(pSender, "failed to build snap replication msg since %s", terrstr()); + goto _OUT; + } + + SyncSnapshotSend *pMsg = rpcMsg.pCont; + pMsg->srcId = pSender->pSyncNode->myRaftId; + pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; + pMsg->term = pSender->term; + pMsg->beginIndex = pSender->snapshotParam.start; + pMsg->lastIndex = pSender->snapshot.lastApplyIndex; + pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; + pMsg->lastConfig = pSender->lastConfig; + pMsg->startTime = pSender->startTime; + pMsg->seq = seq; + + if (pBlock != NULL && blockLen > 0) { + memcpy(pMsg->data, pBlock, blockLen); + } + pMsg->payloadType = typ; + + // send msg + if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { + sSError(pSender, "failed to send snap replication msg since %s", terrstr()); + goto _OUT; + } + + code = 0; +_OUT: + return code; +} + // when sender receive ack, call this function to send msg from seq // seq = ack + 1, already updated static int32_t snapshotSend(SSyncSnapshotSender *pSender) { @@ -273,33 +290,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END); - int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0; - // build msg - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) { - sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr()); - goto _OUT; - } - - SyncSnapshotSend *pMsg = rpcMsg.pCont; - pMsg->srcId = pSender->pSyncNode->myRaftId; - pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = raftStoreGetTerm(pSender->pSyncNode); - pMsg->beginIndex = pSender->snapshotParam.start; - pMsg->lastIndex = pSender->snapshot.lastApplyIndex; - pMsg->lastTerm = pSender->snapshot.lastApplyTerm; - pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; - pMsg->lastConfig = pSender->lastConfig; - pMsg->startTime = pSender->startTime; - pMsg->seq = pSender->seq; - - if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) { - memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen); - } - // send msg - if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { - sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); + int32_t blockLen = (pBlk) ? pBlk->blockLen : 0; + void *pBlock = (pBlk) ? pBlk->pBlock : NULL; + if (syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0) != 0) { goto _OUT; } @@ -336,32 +330,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) { continue; } - // build msg - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) { - sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); - goto _out; - } - - SyncSnapshotSend *pMsg = rpcMsg.pCont; - pMsg->srcId = pSender->pSyncNode->myRaftId; - pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = pSender->term; - pMsg->beginIndex = pSender->snapshotParam.start; - pMsg->lastIndex = pSender->snapshot.lastApplyIndex; - pMsg->lastTerm = pSender->snapshot.lastApplyTerm; - pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; - pMsg->lastConfig = pSender->lastConfig; - pMsg->startTime = pSender->startTime; - pMsg->seq = pBlk->seq; - - if (pBlk->pBlock != NULL && pBlk->blockLen > 0) { - memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen); - } - - // send msg - if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { - sSError(pSender, "snapshot sender resend msg failed since %s", terrstr()); + if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) { goto _out; } pBlk->sendTimeMs = nowMs;