From 70e261f66217e2d1ff8f1c6edd5374db7ed921b6 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 27 Oct 2023 19:21:54 +0800 Subject: [PATCH] enh: terminate snap replication on timeout --- include/libs/sync/sync.h | 2 +- source/libs/sync/src/syncSnapshot.c | 4 +--- source/libs/sync/src/syncTimeout.c | 16 +++++++++++++--- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index e3b5f3611c..a19b249bc7 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -46,7 +46,7 @@ extern "C" { #define SYNC_HEARTBEAT_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_SNAP_RESEND_MS 1000 * 60 -#define SYNC_SNAP_TIMEOUT_MS 1000 * 180 +#define SYNC_SNAP_TIMEOUT_MS 1000 * 600 #define SYNC_VND_COMMIT_MIN_MS 3000 diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 56735d479e..0b94d377f1 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -327,7 +327,6 @@ _OUT:; int32_t snapshotReSend(SSyncSnapshotSender *pSender) { SSyncSnapBuffer *pSndBuf = pSender->pSndBuf; int32_t code = -1; - taosThreadMutexLock(&pSndBuf->mutex); for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) { @@ -366,12 +365,11 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { goto _out; } pBlk->sendTimeMs = nowMs; - pSender->lastSendTime = nowMs; } code = 0; _out:; taosThreadMutexUnlock(&pSndBuf->mutex); - return 0; + return code; } // return 0, start ok diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 5837308e59..a57dfbee53 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -77,9 +77,19 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { for (int i = 0; i < ths->peersNum; ++i) { SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i])); if (pSender != NULL) { - if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start && - timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) { - snapshotReSend(pSender); + if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start) { + int64_t elapsedMs = timeNow - pSender->lastSendTime; + if (elapsedMs < SYNC_SNAP_RESEND_MS) { + continue; + } + + if (elapsedMs > SYNC_SNAP_TIMEOUT_MS) { + sSError(pSender, "snap replication timeout, terminate."); + snapshotSenderStop(pSender, false); + } else { + sSWarn(pSender, "snap replication resend."); + snapshotReSend(pSender); + } } } }