refact: improve code with syncSnapSendMsg

This commit is contained in:
Benguang Zhao 2023-11-01 20:21:23 +08:00
parent 7852b2cecf
commit 3221aef1a3
1 changed files with 51 additions and 82 deletions

View File

@ -23,6 +23,8 @@
#include "syncReplication.h"
#include "syncUtil.h"
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t len, int32_t typ);
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
taosThreadMutexLock(&pBuf->mutex);
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
@ -160,8 +162,11 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
}
int dataLen = 0;
if (snapInfo.data) {
SSyncTLV *datHead = snapInfo.data;
void *pData = snapInfo.data;
int32_t type = 0;
if (pData) {
type = snapInfo.type;
SSyncTLV *datHead = pData;
if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) {
sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ);
terrno = TSDB_CODE_INVALID_DATA_FMT;
@ -170,37 +175,12 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
dataLen = sizeof(SSyncTLV) + datHead->len;
}
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, dataLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
if (syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type) != 0) {
goto _out;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (dataLen > 0) {
pMsg->payloadType = snapInfo.type;
memcpy(pMsg->data, snapInfo.data, dataLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
goto _out;
}
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId));
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId));
code = 0;
_out:
if (snapInfo.data) {
@ -232,6 +212,43 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
}
int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) {
int32_t code = -1;
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "failed to build snap replication msg since %s", terrstr());
goto _OUT;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = seq;
if (pBlock != NULL && blockLen > 0) {
memcpy(pMsg->data, pBlock, blockLen);
}
pMsg->payloadType = typ;
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "failed to send snap replication msg since %s", terrstr());
goto _OUT;
}
code = 0;
_OUT:
return code;
}
// when sender receive ack, call this function to send msg from seq
// seq = ack + 1, already updated
static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
@ -273,33 +290,10 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0;
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
goto _OUT;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
int32_t blockLen = (pBlk) ? pBlk->blockLen : 0;
void *pBlock = (pBlk) ? pBlk->pBlock : NULL;
if (syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0) != 0) {
goto _OUT;
}
@ -336,32 +330,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
continue;
}
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
goto _out;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pBlk->seq;
if (pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) {
goto _out;
}
pBlk->sendTimeMs = nowMs;