feat: resend snap replication of data on timeout

This commit is contained in:
Benguang Zhao 2023-10-27 15:21:55 +08:00
parent fe553352b8
commit 4163a3be7c
6 changed files with 126 additions and 159 deletions

View File

@ -584,6 +584,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
// commit json
if (!rollback) {
ASSERT(pVnode->config.vgId == pWriter->info.config.vgId);
pWriter->info.state.committed = pWriter->ever;
pVnode->config = pWriter->info.config;
pVnode->state = (SVState){.committed = pWriter->info.state.committed,

View File

@ -516,7 +516,10 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool
pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
if (code != 0) {
vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
code);
}
return code;
}

View File

@ -22,9 +22,9 @@ extern "C" {
#include "syncInt.h"
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
#define SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT -1
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_PREP -1
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
@ -57,8 +57,6 @@ typedef struct SSyncSnapshotSender {
int32_t seq;
int32_t ack;
void *pReader;
void *pCurrentBlock;
int32_t blockLen;
SSnapshotParam snapshotParam;
SSnapshot snapshot;
SSyncCfg lastConfig;

View File

@ -818,7 +818,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
if (!taosCheckExistFile(pSyncNode->configPath)) {
// create a new raft config file
sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
pSyncNode->vgId = pSyncInfo->vgId;
pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;

View File

@ -31,9 +31,9 @@ static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
}
pBuf->entries[i % pBuf->size] = NULL;
}
pBuf->start = 1;
pBuf->end = 1;
pBuf->cursor = 0;
pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
pBuf->end = pBuf->start;
pBuf->cursor = pBuf->start - 1;
taosThreadMutexUnlock(&pBuf->mutex);
}
@ -76,8 +76,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
pSender->pReader = NULL;
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0;
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex;
@ -113,12 +111,6 @@ void syncSnapBlockDestroy(void *ptr) {
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
if (pSender == NULL) return;
// free current block
if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock);
pSender->pCurrentBlock = NULL;
}
// close reader
if (pSender->pReader != NULL) {
pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
@ -141,11 +133,9 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
if (started) return 0;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
pSender->pReader = NULL;
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0;
pSender->snapshotParam.start = SYNC_INDEX_INVALID;
pSender->snapshotParam.end = SYNC_INDEX_INVALID;
pSender->snapshot.data = NULL;
@ -196,7 +186,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
pMsg->seq = pSender->seq;
if (dataLen > 0) {
pMsg->payloadType = snapInfo.type;
@ -236,13 +226,6 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
pSender->pReader = NULL;
}
// free current block
if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock);
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0;
}
syncSnapBufferReset(pSender->pSndBuf);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
@ -255,39 +238,45 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
int32_t code = -1;
SyncSnapBlock *pBlk = NULL;
if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) {
if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
pSender->seq++;
pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
if (pBlk == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OUT;
}
if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
if (pBlk == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OUT;
}
// read data
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock,
&pBlk->blockLen);
if (ret != 0) {
sSError(pSender, "snapshot sender read failed since %s", terrstr());
goto _OUT;
}
pBlk->seq = pSender->seq;
pBlk->seq = pSender->seq;
if (pSender->blockLen > 0) {
// has read data
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
} else {
// read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
code = 0;
goto _OUT;
// read data
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
&pBlk->pBlock, &pBlk->blockLen);
if (ret != 0) {
sSError(pSender, "snapshot sender read failed since %s", terrstr());
goto _OUT;
}
if (pBlk->blockLen > 0) {
// has read data
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
} else {
// read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
sSInfo(pSender, "snapshot sender read to the end");
code = 0;
goto _OUT;
}
}
}
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0;
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
goto _OUT;
}
@ -304,7 +293,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (pBlk != NULL && pBlk->pBlock != NULL) {
if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
@ -317,13 +306,12 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// put in buffer
int64_t nowMs = taosGetTimestampMs();
if (pBlk) {
ASSERT(pBlk->seq != SYNC_SNAPSHOT_SEQ_END);
ASSERT(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END);
pBlk->sendTimeMs = nowMs;
pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
pBlk = NULL;
pSender->pSndBuf->end = pSender->seq + 1;
pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
}
pSender->lastSendTime = nowMs;
code = 0;
@ -337,36 +325,52 @@ _OUT:;
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
return -1;
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
int32_t code = -1;
taosThreadMutexLock(&pSndBuf->mutex);
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
ASSERT(pBlk && !pBlk->acked);
int64_t nowMs = taosGetTimestampMs();
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
continue;
}
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
goto _out;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pBlk->seq;
if (pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
goto _out;
}
pBlk->sendTimeMs = nowMs;
pSender->lastSendTime = nowMs;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
}
// send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
return -1;
}
pSender->lastSendTime = taosGetTimestampMs();
code = 0;
_out:;
taosThreadMutexUnlock(&pSndBuf->mutex);
return 0;
}
@ -523,7 +527,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
if (started) return;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
pReceiver->term = pPreMsg->term;
pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime;
@ -592,6 +596,11 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
// update progress
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
// get fsmState
SSnapshot snapshot = {0};
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
pReceiver->pSyncNode->fsmState = snapshot.state;
// reset wal
code =
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
@ -600,12 +609,6 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
return -1;
}
sRInfo(pReceiver, "wal log restored from snapshot");
// get fsmState
SSnapshot snapshot = {0};
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
pReceiver->pSyncNode->fsmState = snapshot.state;
} else {
sRError(pReceiver, "snapshot receiver finish error since writer is null");
return -1;
@ -892,6 +895,9 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
ppMsg[0] = NULL;
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
} else {
syncSnapSendRsp(pReceiver, pMsg, code);
goto _out;
}
for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
@ -991,7 +997,7 @@ _SEND_REPLY:;
// receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT
// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP
// if receiver already start
// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
// if sender.start-time = receiver.start-time, maybe duplicate msg
@ -1040,7 +1046,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
int32_t code = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) {
if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
sInfo("vgId:%d, receive prepare msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, pMsg->term, pMsg->startTime);
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
@ -1052,16 +1058,14 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
sInfo("vgId:%d, receive end msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId,
pMsg->term, pMsg->startTime);
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
if (code != 0) {
sRError(pReceiver, "failed to end snapshot.");
code = -1;
} else if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
code = -1;
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
snapshotReceiverStop(pReceiver);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
} else {
// error log
@ -1118,38 +1122,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
// update next index
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
// update seq
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
// build begin msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "prepare snapshot failed since build msg error");
return -1;
}
SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId;
pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pSendMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pSendMsg->beginIndex = pSender->snapshotParam.start;
pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pSendMsg->lastConfig = pSender->lastConfig;
pSendMsg->startTime = pSender->startTime;
pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
sSInfo(pSender, "begin snapshot replication to dnode %d.", DID(&pSendMsg->destId));
// send msg
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "prepare snapshot failed since send msg error");
return -1;
}
return 0;
return snapshotSend(pSender);
}
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
@ -1166,10 +1139,18 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
SyncSnapshotRsp *pMsg = ppMsg[0];
taosThreadMutexLock(&pSndBuf->mutex);
if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
code = terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _out;
}
if (pSender->pReader == NULL || pSender->finish) {
code = terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
terrno = TSDB_CODE_SYN_BUFFER_FULL;
code = terrno;
code = terrno = TSDB_CODE_SYN_BUFFER_FULL;
goto _out;
}
@ -1196,16 +1177,11 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp
pSndBuf->start = ack + 1;
}
pSender->ack = pSndBuf->start - 1;
while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) {
if (snapshotSend(pSender) != 0) {
code = terrno;
goto _out;
}
if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) {
pSndBuf->end = TMAX(pSender->seq + 1, pSndBuf->end);
}
}
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
@ -1285,23 +1261,17 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
}
// prepare <begin, end>, send begin msg
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg);
}
if (pSender->pReader == NULL || pSender->finish) {
sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code,
pSender->pReader, pSender->finish);
terrno = pMsg->code;
goto _ERROR;
}
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
sSInfo(pSender, "process seq begin");
if (snapshotSend(pSender) != 0) {
// send next msg
if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
if (syncSnapBufferSend(pSender, ppMsg) != 0) {
sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", terrstr(), pSender->seq,
pSender->pReader, pSender->finish);
goto _ERROR;
}
return 0;
}
// receive ack is finish, close sender
@ -1312,11 +1282,6 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
return 0;
}
// send next msg
if (syncSnapBufferSend(pSender, ppMsg) != 0) {
sSError(pSender, "failed to send snapshot msg since %s. seq:%d", terrstr(), pSender->seq);
goto _ERROR;
}
return 0;
_ERROR:

View File

@ -78,9 +78,8 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i]));
if (pSender != NULL) {
if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start &&
timeNow - pSender->lastSendTime > SYNC_SNAP_TIMEOUT_MS) {
sSError(pSender, "snapshot timeout, terminate. lastSendTime:%d", pSender->lastSendTime);
snapshotSenderStop(pSender, false);
timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) {
snapshotReSend(pSender);
}
}
}