enh: use waitTime to prevent from starting snapshot too frequently

This commit is contained in:
Benguang Zhao 2023-09-21 18:38:10 +08:00
parent be6411ebbe
commit c89c69f951
4 changed files with 20 additions and 13 deletions

View File

@ -43,7 +43,7 @@ typedef struct SSyncSnapshotSender {
int64_t sendingMS;
SyncTerm term;
int64_t startTime;
int64_t endTime;
int64_t waitTime;
int64_t lastSendTime;
bool finish;

View File

@ -157,7 +157,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (ths->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
pReply->fsmState = ths->fsmState;
sError("vgId:%d, not allow to accept sync log msg due to incomplete fsm state", ths->vgId);
sError("vgId:%d, not to accept sync log msg due to incomplete fsm state", ths->vgId);
syncEntryDestroy(pEntry);
goto _SEND_RESPONSE;
}

View File

@ -72,7 +72,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
int32_t syncNodeElect(SSyncNode* pSyncNode) {
if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
sNError(pSyncNode, "ignore leader hb timeout due to incomplete fsm state");
sNError(pSyncNode, "skip leader election due to incomplete fsm state");
return -1;
}

View File

@ -45,7 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->replicaIndex = replicaIndex;
pSender->term = raftStoreGetTerm(pSyncNode);
pSender->startTime = -1;
pSender->endTime = -1;
pSender->waitTime = -1;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
pSender->finish = false;
@ -167,7 +167,7 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
if (stopped) return;
pSender->finish = finish;
pSender->endTime = taosGetTimestampMs();
pSender->waitTime = -1;
// close reader
if (pSender->pReader != NULL) {
@ -319,8 +319,12 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
return 0;
}
if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
sSDebug(pSender, "snapshot sender start too frequently, ignore");
int64_t timeNow = taosGetTimestampMs();
if (pSender->waitTime <= 0) {
pSender->waitTime = timeNow + SNAPSHOT_WAIT_MS;
}
if (timeNow < pSender->waitTime) {
sSDebug(pSender, "snapshot sender waitTime not expired yet, ignore");
return 0;
}
@ -674,8 +678,6 @@ _SEND_REPLY:
pRspMsg->code = code;
pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
ASSERT(pRspMsg->startTime);
if (snapInfo.data) {
pRspMsg->payloadType = snapInfo.type;
memcpy(pRspMsg->data, snapInfo.data, dataLen);
@ -684,6 +686,8 @@ _SEND_REPLY:
SSnapshotParam *pParam = &pReceiver->snapshotParam;
void *data = taosMemoryRealloc(pParam->data, dataLen);
if (data == NULL) {
sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId,
strerror(errno), dataLen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = terrno;
goto _out;
@ -695,7 +699,7 @@ _SEND_REPLY:
// send msg
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
sRError(pReceiver, "failed to send resp since %s", terrstr());
code = terrno;
}
@ -916,13 +920,16 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) {
if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
sInfo("vgId:%d, receive pre-snapshot msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, pMsg->term, pMsg->startTime);
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
sInfo("vgId:%d, receive begin msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, pMsg->term, pMsg->startTime);
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
sInfo("vgId:%d, receive end msg of snapshot replication. signature: (%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, pMsg->term, pMsg->startTime);
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());