From c89c69f951351dcf73dff363563293e74a5237fe Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 21 Sep 2023 18:38:10 +0800 Subject: [PATCH] enh: use waitTime to prevent from starting snapshot too frequently --- source/libs/sync/inc/syncSnapshot.h | 2 +- source/libs/sync/src/syncAppendEntries.c | 2 +- source/libs/sync/src/syncElection.c | 2 +- source/libs/sync/src/syncSnapshot.c | 27 +++++++++++++++--------- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 2a19945c5a..95382132b5 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -43,7 +43,7 @@ typedef struct SSyncSnapshotSender { int64_t sendingMS; SyncTerm term; int64_t startTime; - int64_t endTime; + int64_t waitTime; int64_t lastSendTime; bool finish; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 8ae1dd2a54..4776b2bb1b 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -157,7 +157,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (ths->fsmState == SYNC_FSM_STATE_INCOMPLETE) { pReply->fsmState = ths->fsmState; - sError("vgId:%d, not allow to accept sync log msg due to incomplete fsm state", ths->vgId); + sError("vgId:%d, not to accept sync log msg due to incomplete fsm state", ths->vgId); syncEntryDestroy(pEntry); goto _SEND_RESPONSE; } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index b4e2049a64..c57e7e273f 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -72,7 +72,7 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) { if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) { - sNError(pSyncNode, "ignore leader hb timeout due to incomplete fsm state"); + sNError(pSyncNode, "skip leader election due to incomplete fsm state"); return -1; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 2948dbf3d2..f9bde9517e 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -45,7 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->replicaIndex = replicaIndex; pSender->term = raftStoreGetTerm(pSyncNode); pSender->startTime = -1; - pSender->endTime = -1; + pSender->waitTime = -1; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; @@ -167,7 +167,7 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { if (stopped) return; pSender->finish = finish; - pSender->endTime = taosGetTimestampMs(); + pSender->waitTime = -1; // close reader if (pSender->pReader != NULL) { @@ -319,8 +319,12 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; } - if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) { - sSDebug(pSender, "snapshot sender start too frequently, ignore"); + int64_t timeNow = taosGetTimestampMs(); + if (pSender->waitTime <= 0) { + pSender->waitTime = timeNow + SNAPSHOT_WAIT_MS; + } + if (timeNow < pSender->waitTime) { + sSDebug(pSender, "snapshot sender waitTime not expired yet, ignore"); return 0; } @@ -674,8 +678,6 @@ _SEND_REPLY: pRspMsg->code = code; pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode); - ASSERT(pRspMsg->startTime); - if (snapInfo.data) { pRspMsg->payloadType = snapInfo.type; memcpy(pRspMsg->data, snapInfo.data, dataLen); @@ -684,6 +686,8 @@ _SEND_REPLY: SSnapshotParam *pParam = &pReceiver->snapshotParam; void *data = taosMemoryRealloc(pParam->data, dataLen); if (data == NULL) { + sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId, + strerror(errno), dataLen); terrno = TSDB_CODE_OUT_OF_MEMORY; code = terrno; goto _out; @@ -695,7 +699,7 @@ _SEND_REPLY: // send msg syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot"); if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { - sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr()); + sRError(pReceiver, "failed to send resp since %s", terrstr()); code = terrno; } @@ -916,13 +920,16 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) { if (pMsg->term == raftStoreGetTerm(pSyncNode)) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot"); + sInfo("vgId:%d, receive pre-snapshot msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")", + pSyncNode->vgId, pMsg->term, pMsg->startTime); code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin"); + sInfo("vgId:%d, receive begin msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")", + pSyncNode->vgId, pMsg->term, pMsg->startTime); code = syncNodeOnSnapshotBegin(pSyncNode, pMsg); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end"); + sInfo("vgId:%d, receive end msg of snapshot replication. signature: (%" PRId64 ", %" PRId64 ")", + pSyncNode->vgId, pMsg->term, pMsg->startTime); code = syncNodeOnSnapshotEnd(pSyncNode, pMsg); if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) { sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());