From d4add073cc676d9ab52de5d24422d7e88a2f9704 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 3 Nov 2023 16:42:41 +0800 Subject: [PATCH] refact: improve code with syncSnapSendRsp --- source/libs/sync/inc/syncReplication.h | 4 ++ source/libs/sync/src/syncSnapshot.c | 70 ++++++++++---------------- 2 files changed, 31 insertions(+), 43 deletions(-) diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 04456b2454..ecd2b5163e 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -56,6 +56,10 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode); int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); +int32_t syncSnapSendMsg(SSyncSnapshotSender* pSender, int32_t seq, void* pBlock, int32_t len, int32_t typ); +int32_t syncSnapSendRsp(SSyncSnapshotReceiver* pReceiver, SyncSnapshotSend* pMsg, void* pBlock, int32_t len, + int32_t typ, int32_t code); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 95952c960e..e1471ad3d9 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -23,8 +23,6 @@ #include "syncReplication.h" #include "syncUtil.h" -int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ); - static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { taosThreadMutexLock(&pBuf->mutex); for (int64_t i = pBuf->start; i < pBuf->end; ++i) { @@ -153,7 +151,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->lastSendTime = taosGetTimestampMs(); pSender->finish = false; - // Get full snapshot info + // Get snapshot info SSyncNode *pSyncNode = pSender->pSyncNode; SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT}; if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) { @@ -161,11 +159,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { goto _out; } - int dataLen = 0; void *pData = snapInfo.data; - int32_t type = 0; + int32_t type = (pData) ? snapInfo.type : 0; + int32_t dataLen = 0; if (pData) { - type = snapInfo.type; SSyncTLV *datHead = pData; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) { sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); @@ -688,24 +685,23 @@ _START_RECEIVER: snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender -_SEND_REPLY: - // build msg - ; // make complier happy +_SEND_REPLY:; SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY}; int32_t dataLen = 0; - if (pMsg->dataLen > 0) { + if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) { void *data = taosMemoryCalloc(1, pMsg->dataLen); if (data == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = terrno; goto _out; } - memcpy(data, pMsg->data, pMsg->dataLen); snapInfo.data = data; data = NULL; - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo); + memcpy(snapInfo.data, pMsg->data, pMsg->dataLen); + // exchange snap info + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo); SSyncTLV *datHead = snapInfo.data; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); @@ -715,29 +711,16 @@ _SEND_REPLY: dataLen = sizeof(SSyncTLV) + datHead->len; } - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, dataLen, pSyncNode->vgId) != 0) { - sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr()); + // send response + void *pData = snapInfo.data; + int32_t type = (pData) ? snapInfo.type : 0; + + if (syncSnapSendRsp(pReceiver, pMsg, pData, dataLen, type, code) != 0) { code = terrno; goto _out; } - SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; - pRspMsg->srcId = pSyncNode->myRaftId; - pRspMsg->destId = pMsg->srcId; - pRspMsg->term = raftStoreGetTerm(pSyncNode); - pRspMsg->lastIndex = pMsg->lastIndex; - pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->startTime = pMsg->startTime; - pRspMsg->ack = pMsg->seq; // receiver maybe already closed - pRspMsg->code = code; - pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode); - - if (snapInfo.data) { - pRspMsg->payloadType = snapInfo.type; - memcpy(pRspMsg->data, snapInfo.data, dataLen); - - // save snapshot info + if (pData) { SSnapshotParam *pParam = &pReceiver->snapshotParam; void *data = taosMemoryRealloc(pParam->data, dataLen); if (data == NULL) { @@ -748,15 +731,10 @@ _SEND_REPLY: goto _out; } pParam->data = data; + data = NULL; memcpy(pParam->data, snapInfo.data, dataLen); } - // send msg - if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { - sRError(pReceiver, "failed to send resp since %s", terrstr()); - code = terrno; - } - _out: if (snapInfo.data) { taosMemoryFree(snapInfo.data); @@ -820,11 +798,12 @@ _SEND_REPLY: return code; } -static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, int32_t code) { +int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen, + int32_t type, int32_t code) { SSyncNode *pSyncNode = pReceiver->pSyncNode; // build msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) { + if (syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) { sRError(pReceiver, "failed to build snapshot receiver resp since %s", terrstr()); return -1; } @@ -832,13 +811,18 @@ static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSen SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; pRspMsg->srcId = pSyncNode->myRaftId; pRspMsg->destId = pMsg->srcId; - pRspMsg->term = raftStoreGetTerm(pSyncNode); + pRspMsg->term = pMsg->term; pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pMsg->startTime; pRspMsg->ack = pMsg->seq; pRspMsg->code = code; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; + pRspMsg->payloadType = type; + + if (pBlock != NULL && blockLen > 0) { + memcpy(pRspMsg->data, pBlock, blockLen); + } // send msg if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { @@ -872,7 +856,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot ppMsg[0] = NULL; pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end); } else if (pMsg->seq < pRcvBuf->start) { - syncSnapSendRsp(pReceiver, pMsg, code); + syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code); goto _out; } @@ -892,7 +876,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot } } pRcvBuf->start = seq + 1; - syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], code); + syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code); pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]); pRcvBuf->entries[seq % pRcvBuf->size] = NULL; if (code) goto _out; @@ -915,7 +899,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr()); - return syncSnapSendRsp(pReceiver, pMsg, terrno); + return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, terrno); } return syncSnapBufferRecv(pReceiver, ppMsg);