From 4163a3be7c8233d9ca32de6d570b766ba8e5c00e Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 27 Oct 2023 15:21:55 +0800 Subject: [PATCH] feat: resend snap replication of data on timeout --- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 1 + source/dnode/vnode/src/vnd/vnodeSync.c | 5 +- source/libs/sync/inc/syncSnapshot.h | 6 +- source/libs/sync/src/syncMain.c | 3 +- source/libs/sync/src/syncSnapshot.c | 265 +++++++++------------ source/libs/sync/src/syncTimeout.c | 5 +- 6 files changed, 126 insertions(+), 159 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 87b407efcb..91244e321f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -584,6 +584,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * // commit json if (!rollback) { + ASSERT(pVnode->config.vgId == pWriter->info.config.vgId); pWriter->info.state.committed = pWriter->ever; pVnode->config = pWriter->info.config; pVnode->state = (SVState){.committed = pWriter->info.state.committed, diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 6c03ed68e9..c9e805d80b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -516,7 +516,10 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); - vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code); + if (code != 0) { + vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(), + code); + } return code; } diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index e68702568a..f8ee99e8a0 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -22,9 +22,9 @@ extern "C" { #include "syncInt.h" -#define SYNC_SNAPSHOT_SEQ_INVALID -2 #define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3 -#define SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT -1 +#define SYNC_SNAPSHOT_SEQ_INVALID -2 +#define SYNC_SNAPSHOT_SEQ_PREP -1 #define SYNC_SNAPSHOT_SEQ_BEGIN 0 #define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF @@ -57,8 +57,6 @@ typedef struct SSyncSnapshotSender { int32_t seq; int32_t ack; void *pReader; - void *pCurrentBlock; - int32_t blockLen; SSnapshotParam snapshotParam; SSnapshot snapshot; SSyncCfg lastConfig; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f9dc10da02..199c7a1445 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -818,7 +818,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { if (!taosCheckExistFile(pSyncNode->configPath)) { // create a new raft config file - sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId); + sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId); + pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy; pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy; pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index eee0ab2cc9..92d9571906 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -31,9 +31,9 @@ static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { } pBuf->entries[i % pBuf->size] = NULL; } - pBuf->start = 1; - pBuf->end = 1; - pBuf->cursor = 0; + pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1; + pBuf->end = pBuf->start; + pBuf->cursor = pBuf->start - 1; taosThreadMutexUnlock(&pBuf->mutex); } @@ -76,8 +76,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->pReader = NULL; - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; @@ -113,12 +111,6 @@ void syncSnapBlockDestroy(void *ptr) { void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { if (pSender == NULL) return; - // free current block - if (pSender->pCurrentBlock != NULL) { - taosMemoryFree(pSender->pCurrentBlock); - pSender->pCurrentBlock = NULL; - } - // close reader if (pSender->pReader != NULL) { pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); @@ -141,11 +133,9 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true); if (started) return 0; - pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; + pSender->seq = SYNC_SNAPSHOT_SEQ_PREP; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->pReader = NULL; - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; pSender->snapshotParam.start = SYNC_INDEX_INVALID; pSender->snapshotParam.end = SYNC_INDEX_INVALID; pSender->snapshot.data = NULL; @@ -196,7 +186,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfig = pSender->lastConfig; pMsg->startTime = pSender->startTime; - pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; + pMsg->seq = pSender->seq; if (dataLen > 0) { pMsg->payloadType = snapInfo.type; @@ -236,13 +226,6 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { pSender->pReader = NULL; } - // free current block - if (pSender->pCurrentBlock != NULL) { - taosMemoryFree(pSender->pCurrentBlock); - pSender->pCurrentBlock = NULL; - pSender->blockLen = 0; - } - syncSnapBufferReset(pSender->pSndBuf); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; @@ -255,39 +238,45 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { int32_t code = -1; SyncSnapBlock *pBlk = NULL; - if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) { + if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) { pSender->seq++; - pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock)); - if (pBlk == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OUT; - } + if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) { + pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock)); + if (pBlk == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OUT; + } - // read data - int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock, - &pBlk->blockLen); - if (ret != 0) { - sSError(pSender, "snapshot sender read failed since %s", terrstr()); - goto _OUT; - } - pBlk->seq = pSender->seq; + pBlk->seq = pSender->seq; - if (pSender->blockLen > 0) { - // has read data - sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq); - } else { - // read finish, update seq to end - pSender->seq = SYNC_SNAPSHOT_SEQ_END; - sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq); - code = 0; - goto _OUT; + // read data + int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, + &pBlk->pBlock, &pBlk->blockLen); + if (ret != 0) { + sSError(pSender, "snapshot sender read failed since %s", terrstr()); + goto _OUT; + } + + if (pBlk->blockLen > 0) { + // has read data + sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq); + } else { + // read finish, update seq to end + pSender->seq = SYNC_SNAPSHOT_SEQ_END; + sSInfo(pSender, "snapshot sender read to the end"); + code = 0; + goto _OUT; + } } } + ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END); + + int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0; // build msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) { + if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) { sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr()); goto _OUT; } @@ -304,7 +293,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { pMsg->startTime = pSender->startTime; pMsg->seq = pSender->seq; - if (pBlk != NULL && pBlk->pBlock != NULL) { + if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) { memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen); } @@ -317,13 +306,12 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { // put in buffer int64_t nowMs = taosGetTimestampMs(); if (pBlk) { - ASSERT(pBlk->seq != SYNC_SNAPSHOT_SEQ_END); + ASSERT(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END); pBlk->sendTimeMs = nowMs; pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk; pBlk = NULL; - pSender->pSndBuf->end = pSender->seq + 1; + pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end); } - pSender->lastSendTime = nowMs; code = 0; @@ -337,36 +325,52 @@ _OUT:; // send snapshot data from cache int32_t snapshotReSend(SSyncSnapshotSender *pSender) { - // build msg - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) { - sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); - return -1; + SSyncSnapBuffer *pSndBuf = pSender->pSndBuf; + int32_t code = -1; + + taosThreadMutexLock(&pSndBuf->mutex); + + for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) { + SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size]; + ASSERT(pBlk && !pBlk->acked); + int64_t nowMs = taosGetTimestampMs(); + if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) { + continue; + } + // build msg + SRpcMsg rpcMsg = {0}; + if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) { + sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); + goto _out; + } + + SyncSnapshotSend *pMsg = rpcMsg.pCont; + pMsg->srcId = pSender->pSyncNode->myRaftId; + pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; + pMsg->term = pSender->term; + pMsg->beginIndex = pSender->snapshotParam.start; + pMsg->lastIndex = pSender->snapshot.lastApplyIndex; + pMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; + pMsg->lastConfig = pSender->lastConfig; + pMsg->startTime = pSender->startTime; + pMsg->seq = pBlk->seq; + + if (pBlk->pBlock != NULL && pBlk->blockLen > 0) { + memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen); + } + + // send msg + if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { + sSError(pSender, "snapshot sender resend msg failed since %s", terrstr()); + goto _out; + } + pBlk->sendTimeMs = nowMs; + pSender->lastSendTime = nowMs; } - - SyncSnapshotSend *pMsg = rpcMsg.pCont; - pMsg->srcId = pSender->pSyncNode->myRaftId; - pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pMsg->term = pSender->term; - pMsg->beginIndex = pSender->snapshotParam.start; - pMsg->lastIndex = pSender->snapshot.lastApplyIndex; - pMsg->lastTerm = pSender->snapshot.lastApplyTerm; - pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; - pMsg->lastConfig = pSender->lastConfig; - pMsg->startTime = pSender->startTime; - pMsg->seq = pSender->seq; - - if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) { - memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); - } - - // send msg - if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { - sSError(pSender, "snapshot sender resend msg failed since %s", terrstr()); - return -1; - } - - pSender->lastSendTime = taosGetTimestampMs(); + code = 0; +_out:; + taosThreadMutexUnlock(&pSndBuf->mutex); return 0; } @@ -523,7 +527,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true); if (started) return; - pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP; pReceiver->term = pPreMsg->term; pReceiver->fromId = pPreMsg->srcId; pReceiver->startTime = pPreMsg->startTime; @@ -592,6 +596,11 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap // update progress pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; + // get fsmState + SSnapshot snapshot = {0}; + pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); + pReceiver->pSyncNode->fsmState = snapshot.state; + // reset wal code = pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); @@ -600,12 +609,6 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap return -1; } sRInfo(pReceiver, "wal log restored from snapshot"); - - // get fsmState - SSnapshot snapshot = {0}; - pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); - pReceiver->pSyncNode->fsmState = snapshot.state; - } else { sRError(pReceiver, "snapshot receiver finish error since writer is null"); return -1; @@ -892,6 +895,9 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg; ppMsg[0] = NULL; pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end); + } else { + syncSnapSendRsp(pReceiver, pMsg, code); + goto _out; } for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) { @@ -991,7 +997,7 @@ _SEND_REPLY:; // receiver on message // -// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT +// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP // if receiver already start // if sender.start-time > receiver.start-time, restart receiver(reply snapshot start) // if sender.start-time = receiver.start-time, maybe duplicate msg @@ -1040,7 +1046,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { int32_t code = 0; if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) { if (pMsg->term == raftStoreGetTerm(pSyncNode)) { - if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) { sInfo("vgId:%d, receive prepare msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, pMsg->startTime); code = syncNodeOnSnapshotPrep(pSyncNode, pMsg); @@ -1052,16 +1058,14 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { sInfo("vgId:%d, receive end msg of snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term, pMsg->startTime); code = syncNodeOnSnapshotEnd(pSyncNode, pMsg); - if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) { + if (code != 0) { + sRError(pReceiver, "failed to end snapshot."); + code = -1; + } else if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) { sRError(pReceiver, "failed to reinit log buffer since %s", terrstr()); code = -1; } - } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { - // force close, no response - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop"); - snapshotReceiverStop(pReceiver); } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { - syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data"); code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg); } else { // error log @@ -1118,38 +1122,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend // update next index syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1); - // update seq - pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; - - // build begin msg - SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) { - sSError(pSender, "prepare snapshot failed since build msg error"); - return -1; - } - - SyncSnapshotSend *pSendMsg = rpcMsg.pCont; - pSendMsg->srcId = pSender->pSyncNode->myRaftId; - pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - pSendMsg->term = raftStoreGetTerm(pSender->pSyncNode); - pSendMsg->beginIndex = pSender->snapshotParam.start; - pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex; - pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm; - pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; - pSendMsg->lastConfig = pSender->lastConfig; - pSendMsg->startTime = pSender->startTime; - pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN; - - sSInfo(pSender, "begin snapshot replication to dnode %d.", DID(&pSendMsg->destId)); - - // send msg - syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre"); - if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { - sSError(pSender, "prepare snapshot failed since send msg error"); - return -1; - } - - return 0; + return snapshotSend(pSender); } static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { @@ -1166,10 +1139,18 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp SyncSnapshotRsp *pMsg = ppMsg[0]; taosThreadMutexLock(&pSndBuf->mutex); + if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) { + code = terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + goto _out; + } + + if (pSender->pReader == NULL || pSender->finish) { + code = terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + goto _out; + } if (pMsg->ack - pSndBuf->start >= pSndBuf->size) { - terrno = TSDB_CODE_SYN_BUFFER_FULL; - code = terrno; + code = terrno = TSDB_CODE_SYN_BUFFER_FULL; goto _out; } @@ -1196,16 +1177,11 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp pSndBuf->start = ack + 1; } - pSender->ack = pSndBuf->start - 1; - while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) { if (snapshotSend(pSender) != 0) { code = terrno; goto _out; } - if (pSender->seq != SYNC_SNAPSHOT_SEQ_END) { - pSndBuf->end = TMAX(pSender->seq + 1, pSndBuf->end); - } } if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) { @@ -1285,23 +1261,17 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { } // prepare , send begin msg - if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) { return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg); } - if (pSender->pReader == NULL || pSender->finish) { - sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code, - pSender->pReader, pSender->finish); - terrno = pMsg->code; - goto _ERROR; - } - - if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { - sSInfo(pSender, "process seq begin"); - if (snapshotSend(pSender) != 0) { + // send next msg + if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) { + if (syncSnapBufferSend(pSender, ppMsg) != 0) { + sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", terrstr(), pSender->seq, + pSender->pReader, pSender->finish); goto _ERROR; } - return 0; } // receive ack is finish, close sender @@ -1312,11 +1282,6 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { return 0; } - // send next msg - if (syncSnapBufferSend(pSender, ppMsg) != 0) { - sSError(pSender, "failed to send snapshot msg since %s. seq:%d", terrstr(), pSender->seq); - goto _ERROR; - } return 0; _ERROR: diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 91c7494fa4..5837308e59 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -78,9 +78,8 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i])); if (pSender != NULL) { if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start && - timeNow - pSender->lastSendTime > SYNC_SNAP_TIMEOUT_MS) { - sSError(pSender, "snapshot timeout, terminate. lastSendTime:%d", pSender->lastSendTime); - snapshotSenderStop(pSender, false); + timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) { + snapshotReSend(pSender); } } }