enh: terminate snap replication on timeout
This commit is contained in:
parent
c3f9cae36b
commit
70e261f662
|
@ -46,7 +46,7 @@ extern "C" {
|
||||||
#define SYNC_HEARTBEAT_SLOW_MS 1500
|
#define SYNC_HEARTBEAT_SLOW_MS 1500
|
||||||
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
|
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
|
||||||
#define SYNC_SNAP_RESEND_MS 1000 * 60
|
#define SYNC_SNAP_RESEND_MS 1000 * 60
|
||||||
#define SYNC_SNAP_TIMEOUT_MS 1000 * 180
|
#define SYNC_SNAP_TIMEOUT_MS 1000 * 600
|
||||||
|
|
||||||
#define SYNC_VND_COMMIT_MIN_MS 3000
|
#define SYNC_VND_COMMIT_MIN_MS 3000
|
||||||
|
|
||||||
|
|
|
@ -327,7 +327,6 @@ _OUT:;
|
||||||
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
||||||
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
|
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
taosThreadMutexLock(&pSndBuf->mutex);
|
taosThreadMutexLock(&pSndBuf->mutex);
|
||||||
|
|
||||||
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
|
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
|
||||||
|
@ -366,12 +365,11 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
pBlk->sendTimeMs = nowMs;
|
pBlk->sendTimeMs = nowMs;
|
||||||
pSender->lastSendTime = nowMs;
|
|
||||||
}
|
}
|
||||||
code = 0;
|
code = 0;
|
||||||
_out:;
|
_out:;
|
||||||
taosThreadMutexUnlock(&pSndBuf->mutex);
|
taosThreadMutexUnlock(&pSndBuf->mutex);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// return 0, start ok
|
// return 0, start ok
|
||||||
|
|
|
@ -77,9 +77,19 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
|
||||||
for (int i = 0; i < ths->peersNum; ++i) {
|
for (int i = 0; i < ths->peersNum; ++i) {
|
||||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i]));
|
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i]));
|
||||||
if (pSender != NULL) {
|
if (pSender != NULL) {
|
||||||
if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start &&
|
if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start) {
|
||||||
timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) {
|
int64_t elapsedMs = timeNow - pSender->lastSendTime;
|
||||||
snapshotReSend(pSender);
|
if (elapsedMs < SYNC_SNAP_RESEND_MS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (elapsedMs > SYNC_SNAP_TIMEOUT_MS) {
|
||||||
|
sSError(pSender, "snap replication timeout, terminate.");
|
||||||
|
snapshotSenderStop(pSender, false);
|
||||||
|
} else {
|
||||||
|
sSWarn(pSender, "snap replication resend.");
|
||||||
|
snapshotReSend(pSender);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue