refact: improve code with syncSnapSendRsp

This commit is contained in:
Benguang Zhao 2023-11-03 16:42:41 +08:00
parent de9c9d4f20
commit d4add073cc
2 changed files with 31 additions and 43 deletions

View File

@ -56,6 +56,10 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode);
int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg);
int32_t syncSnapSendMsg(SSyncSnapshotSender* pSender, int32_t seq, void* pBlock, int32_t len, int32_t typ);
int32_t syncSnapSendRsp(SSyncSnapshotReceiver* pReceiver, SyncSnapshotSend* pMsg, void* pBlock, int32_t len,
int32_t typ, int32_t code);
#ifdef __cplusplus
}
#endif

View File

@ -23,8 +23,6 @@
#include "syncReplication.h"
#include "syncUtil.h"
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ);
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
taosThreadMutexLock(&pBuf->mutex);
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
@ -153,7 +151,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->lastSendTime = taosGetTimestampMs();
pSender->finish = false;
// Get full snapshot info
// Get snapshot info
SSyncNode *pSyncNode = pSender->pSyncNode;
SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) {
@ -161,11 +159,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
goto _out;
}
int dataLen = 0;
void *pData = snapInfo.data;
int32_t type = 0;
int32_t type = (pData) ? snapInfo.type : 0;
int32_t dataLen = 0;
if (pData) {
type = snapInfo.type;
SSyncTLV *datHead = pData;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
@ -688,24 +685,23 @@ _START_RECEIVER:
snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender
_SEND_REPLY:
// build msg
; // make complier happy
_SEND_REPLY:;
SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
int32_t dataLen = 0;
if (pMsg->dataLen > 0) {
if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) {
void *data = taosMemoryCalloc(1, pMsg->dataLen);
if (data == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _out;
}
memcpy(data, pMsg->data, pMsg->dataLen);
snapInfo.data = data;
data = NULL;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo);
memcpy(snapInfo.data, pMsg->data, pMsg->dataLen);
// exchange snap info
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo);
SSyncTLV *datHead = snapInfo.data;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
@ -715,29 +711,16 @@ _SEND_REPLY:
dataLen = sizeof(SSyncTLV) + datHead->len;
}
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, dataLen, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
// send response
void *pData = snapInfo.data;
int32_t type = (pData) ? snapInfo.type : 0;
if (syncSnapSendRsp(pReceiver, pMsg, pData, dataLen, type, code) != 0) {
code = terrno;
goto _out;
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pMsg->startTime;
pRspMsg->ack = pMsg->seq; // receiver maybe already closed
pRspMsg->code = code;
pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
if (snapInfo.data) {
pRspMsg->payloadType = snapInfo.type;
memcpy(pRspMsg->data, snapInfo.data, dataLen);
// save snapshot info
if (pData) {
SSnapshotParam *pParam = &pReceiver->snapshotParam;
void *data = taosMemoryRealloc(pParam->data, dataLen);
if (data == NULL) {
@ -748,15 +731,10 @@ _SEND_REPLY:
goto _out;
}
pParam->data = data;
data = NULL;
memcpy(pParam->data, snapInfo.data, dataLen);
}
// send msg
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "failed to send resp since %s", terrstr());
code = terrno;
}
_out:
if (snapInfo.data) {
taosMemoryFree(snapInfo.data);
@ -820,11 +798,12 @@ _SEND_REPLY:
return code;
}
static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, int32_t code) {
int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen,
int32_t type, int32_t code) {
SSyncNode *pSyncNode = pReceiver->pSyncNode;
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) {
if (syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) {
sRError(pReceiver, "failed to build snapshot receiver resp since %s", terrstr());
return -1;
}
@ -832,13 +811,18 @@ static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSen
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
pRspMsg->destId = pMsg->srcId;
pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->term = pMsg->term;
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pMsg->startTime;
pRspMsg->ack = pMsg->seq;
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
pRspMsg->payloadType = type;
if (pBlock != NULL && blockLen > 0) {
memcpy(pRspMsg->data, pBlock, blockLen);
}
// send msg
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
@ -872,7 +856,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
ppMsg[0] = NULL;
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
} else if (pMsg->seq < pRcvBuf->start) {
syncSnapSendRsp(pReceiver, pMsg, code);
syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code);
goto _out;
}
@ -892,7 +876,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
}
}
pRcvBuf->start = seq + 1;
syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], code);
syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code);
pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
if (code) goto _out;
@ -915,7 +899,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr());
return syncSnapSendRsp(pReceiver, pMsg, terrno);
return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, terrno);
}
return syncSnapBufferRecv(pReceiver, ppMsg);