Merge pull request #23377 from taosdata/FIX/TD-26596-3.0

feat: support pipelining of snap replication
This commit is contained in:
wade zhang 2023-10-31 15:19:07 +08:00 committed by GitHub
commit 76c9b9afc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 526 additions and 352 deletions

View File

@ -46,6 +46,7 @@ extern "C" {
#define SYNC_HEARTBEAT_SLOW_MS 1500 #define SYNC_HEARTBEAT_SLOW_MS 1500
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
#define SYNC_SNAP_RESEND_MS 1000 * 60 #define SYNC_SNAP_RESEND_MS 1000 * 60
#define SYNC_SNAP_TIMEOUT_MS 1000 * 600
#define SYNC_VND_COMMIT_MIN_MS 3000 #define SYNC_VND_COMMIT_MIN_MS 3000

View File

@ -303,6 +303,8 @@ typedef enum ELogicConditionType {
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512 #define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
#define TSDB_SYNC_NEGOTIATION_WIN 512 #define TSDB_SYNC_NEGOTIATION_WIN 512
#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048
#define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta

View File

@ -584,6 +584,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
// commit json // commit json
if (!rollback) { if (!rollback) {
ASSERT(pVnode->config.vgId == pWriter->info.config.vgId);
pWriter->info.state.committed = pWriter->ever; pWriter->info.state.committed = pWriter->ever;
pVnode->config = pWriter->info.config; pVnode->config = pWriter->info.config;
pVnode->state = (SVState){.committed = pWriter->info.state.committed, pVnode->state = (SVState){.committed = pWriter->info.state.committed,

View File

@ -516,7 +516,10 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool
pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code); if (code != 0) {
vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
code);
}
return code; return code;
} }

View File

@ -249,8 +249,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnSnapshot(SSyncNode* ths, SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);

View File

@ -22,21 +22,41 @@ extern "C" {
#include "syncInt.h" #include "syncInt.h"
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3 #define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
#define SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT -1 #define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_PREP -1
#define SYNC_SNAPSHOT_SEQ_BEGIN 0 #define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
#define SYNC_SNAPSHOT_RETRY_MS 5000 #define SYNC_SNAPSHOT_RETRY_MS 5000
typedef struct SSyncSnapBuffer {
void *entries[TSDB_SYNC_SNAP_BUFFER_SIZE];
int64_t start;
int64_t cursor;
int64_t end;
int64_t size;
TdThreadMutex mutex;
void (*entryDeleteCb)(void *ptr);
} SSyncSnapBuffer;
typedef struct SyncSnapBlock {
int32_t seq;
int8_t acked;
int64_t sendTimeMs;
int16_t blockType;
void *pBlock;
int32_t blockLen;
} SyncSnapBlock;
void syncSnapBlockDestroy(void *ptr);
typedef struct SSyncSnapshotSender { typedef struct SSyncSnapshotSender {
int8_t start; int8_t start;
int32_t seq; int32_t seq;
int32_t ack; int32_t ack;
void *pReader; void *pReader;
void *pCurrentBlock;
int32_t blockLen;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
SSyncCfg lastConfig; SSyncCfg lastConfig;
@ -47,6 +67,9 @@ typedef struct SSyncSnapshotSender {
int64_t lastSendTime; int64_t lastSendTime;
bool finish; bool finish;
// ring buffer for ack
SSyncSnapBuffer *pSndBuf;
// init when create // init when create
SSyncNode *pSyncNode; SSyncNode *pSyncNode;
int32_t replicaIndex; int32_t replicaIndex;
@ -72,6 +95,9 @@ typedef struct SSyncSnapshotReceiver {
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
// buffer
SSyncSnapBuffer *pRcvBuf;
// init when create // init when create
SSyncNode *pSyncNode; SSyncNode *pSyncNode;
} SSyncSnapshotReceiver; } SSyncSnapshotReceiver;
@ -83,8 +109,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
// on message // on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); // int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg); // int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex); SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex);

View File

@ -818,7 +818,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
if (!taosCheckExistFile(pSyncNode->configPath)) { if (!taosCheckExistFile(pSyncNode->configPath)) {
// create a new raft config file // create a new raft config file
sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId); sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
pSyncNode->vgId = pSyncInfo->vgId;
pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy; pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy; pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex; pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;

View File

@ -797,7 +797,7 @@ _out:
pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr); pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64 sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64
", retryWaitMs:%" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
" %" PRId64 ", %" PRId64 ")", " %" PRId64 ", %" PRId64 ")",
pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex, pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
@ -815,9 +815,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
ASSERT(pMgr->matchIndex == 0); ASSERT(pMgr->matchIndex == 0);
if (pMsg->matchIndex < 0) { if (pMsg->matchIndex < 0) {
pMgr->restored = true; pMgr->restored = true;
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0; return 0;
} }
@ -832,9 +832,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->matchIndex = pMsg->matchIndex; pMgr->matchIndex = pMsg->matchIndex;
pMgr->restored = true; pMgr->restored = true;
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64 sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0; return 0;
} }
@ -958,10 +958,10 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
pMgr->endIndex = index + 1; pMgr->endIndex = index + 1;
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ". mgr (rs:%d): [%" PRId64 sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ". repl-mgr:[%" PRId64
" %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0; return 0;
} }
@ -1002,9 +1002,9 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
pMgr->endIndex = index + 1; pMgr->endIndex = index + 1;
if (barrier) { if (barrier) {
sInfo("vgId:%d, replicated sync barrier to dnode:%d. index:%" PRId64 ", term:%" PRId64 sInfo("vgId:%d, replicated sync barrier to dnode:%d. index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64
", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(pDestId), index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex); pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
break; break;
} }
} }
@ -1013,10 +1013,10 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64 sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64
", mgr: (rs:%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
")", ")",
pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0; return 0;
} }

View File

@ -23,6 +23,44 @@
#include "syncReplication.h" #include "syncReplication.h"
#include "syncUtil.h" #include "syncUtil.h"
static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) {
taosThreadMutexLock(&pBuf->mutex);
for (int64_t i = pBuf->start; i < pBuf->end; ++i) {
if (pBuf->entryDeleteCb) {
pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]);
}
pBuf->entries[i % pBuf->size] = NULL;
}
pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1;
pBuf->end = pBuf->start;
pBuf->cursor = pBuf->start - 1;
taosThreadMutexUnlock(&pBuf->mutex);
}
static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) {
if (ppBuf == NULL || ppBuf[0] == NULL) return;
SSyncSnapBuffer *pBuf = ppBuf[0];
syncSnapBufferReset(pBuf);
taosThreadMutexDestroy(&pBuf->mutex);
taosMemoryFree(ppBuf[0]);
ppBuf[0] = NULL;
return;
}
static SSyncSnapBuffer *syncSnapBufferCreate() {
SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer));
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pBuf->size = sizeof(pBuf->entries) / sizeof(void *);
ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE);
taosThreadMutexInit(&pBuf->mutex, NULL);
return pBuf;
}
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoRead != NULL); (pSyncNode->pFsm->FpSnapshotDoRead != NULL);
@ -38,8 +76,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
pSender->pReader = NULL; pSender->pReader = NULL;
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0;
pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS; pSender->sendingMS = SYNC_SNAPSHOT_RETRY_MS;
pSender->pSyncNode = pSyncNode; pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex; pSender->replicaIndex = replicaIndex;
@ -49,24 +85,42 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
pSender->finish = false; pSender->finish = false;
SSyncSnapBuffer *pSndBuf = syncSnapBufferCreate();
if (pSndBuf == NULL) {
taosMemoryFree(pSender);
pSender = NULL;
return NULL;
}
pSndBuf->entryDeleteCb = syncSnapBlockDestroy;
pSender->pSndBuf = pSndBuf;
syncSnapBufferReset(pSender->pSndBuf);
return pSender; return pSender;
} }
void syncSnapBlockDestroy(void *ptr) {
SyncSnapBlock *pBlk = ptr;
if (pBlk->pBlock != NULL) {
taosMemoryFree(pBlk->pBlock);
pBlk->pBlock = NULL;
pBlk->blockLen = 0;
}
taosMemoryFree(pBlk);
}
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
if (pSender == NULL) return; if (pSender == NULL) return;
// free current block
if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock);
pSender->pCurrentBlock = NULL;
}
// close reader // close reader
if (pSender->pReader != NULL) { if (pSender->pReader != NULL) {
pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader);
pSender->pReader = NULL; pSender->pReader = NULL;
} }
// free snap buffer
if (pSender->pSndBuf) {
syncSnapBufferDestroy(&pSender->pSndBuf);
}
// free sender // free sender
taosMemoryFree(pSender); taosMemoryFree(pSender);
} }
@ -79,11 +133,9 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true); int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
if (started) return 0; if (started) return 0;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_PREP;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
pSender->pReader = NULL; pSender->pReader = NULL;
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0;
pSender->snapshotParam.start = SYNC_INDEX_INVALID; pSender->snapshotParam.start = SYNC_INDEX_INVALID;
pSender->snapshotParam.end = SYNC_INDEX_INVALID; pSender->snapshotParam.end = SYNC_INDEX_INVALID;
pSender->snapshot.data = NULL; pSender->snapshot.data = NULL;
@ -127,29 +179,28 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = raftStoreGetTerm(pSender->pSyncNode); pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start; pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig; pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime; pMsg->startTime = pSender->startTime;
pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; pMsg->seq = pSender->seq;
if (dataLen > 0) { if (dataLen > 0) {
pMsg->payloadType = snapInfo.type; pMsg->payloadType = snapInfo.type;
memcpy(pMsg->data, snapInfo.data, dataLen); memcpy(pMsg->data, snapInfo.data, dataLen);
} }
// event log
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
// send msg // send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
goto _out; goto _out;
} }
sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&pMsg->destId));
code = 0; code = 0;
_out: _out:
if (snapInfo.data) { if (snapInfo.data) {
@ -175,48 +226,59 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
pSender->pReader = NULL; pSender->pReader = NULL;
} }
// free current block syncSnapBufferReset(pSender->pSndBuf);
if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pSender->pCurrentBlock = NULL; sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish);
pSender->blockLen = 0;
}
} }
// when sender receive ack, call this function to send msg from seq // when sender receive ack, call this function to send msg from seq
// seq = ack + 1, already updated // seq = ack + 1, already updated
static int32_t snapshotSend(SSyncSnapshotSender *pSender) { static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// free memory last time (current seq - 1) int32_t code = -1;
if (pSender->pCurrentBlock != NULL) { SyncSnapBlock *pBlk = NULL;
taosMemoryFree(pSender->pCurrentBlock);
pSender->pCurrentBlock = NULL; if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) {
pSender->blockLen = 0; pSender->seq++;
if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) {
pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock));
if (pBlk == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OUT;
} }
pBlk->seq = pSender->seq;
// read data // read data
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
&pSender->pCurrentBlock, &pSender->blockLen); &pBlk->pBlock, &pBlk->blockLen);
if (ret != 0) { if (ret != 0) {
sSError(pSender, "snapshot sender read failed since %s", terrstr()); sSError(pSender, "snapshot sender read failed since %s", terrstr());
return -1; goto _OUT;
} }
if (pSender->blockLen > 0) { if (pBlk->blockLen > 0) {
// has read data // has read data
sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId, sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pBlk->blockLen, pBlk->seq);
pSender->blockLen, pSender->seq);
} else { } else {
// read finish, update seq to end // read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END; pSender->seq = SYNC_SNAPSHOT_SEQ_END;
sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId, sSInfo(pSender, "snapshot sender read to the end");
pSender->blockLen, pSender->seq); code = 0;
goto _OUT;
}
}
} }
ASSERT(pSender->seq >= SYNC_SNAPSHOT_SEQ_BEGIN && pSender->seq <= SYNC_SNAPSHOT_SEQ_END);
int32_t blockLen = (pBlk != NULL) ? pBlk->blockLen : 0;
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) { if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr()); sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
return -1; goto _OUT;
} }
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
@ -231,77 +293,83 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->startTime = pSender->startTime; pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq; pMsg->seq = pSender->seq;
if (pSender->pCurrentBlock != NULL) { if (pBlk != NULL && pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
}
// event log
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
} else {
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
} }
// send msg // send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr()); sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1; goto _OUT;
} }
pSender->lastSendTime = taosGetTimestampMs(); // put in buffer
return 0; int64_t nowMs = taosGetTimestampMs();
if (pBlk) {
ASSERT(pBlk->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pBlk->seq < SYNC_SNAPSHOT_SEQ_END);
pBlk->sendTimeMs = nowMs;
pSender->pSndBuf->entries[pSender->seq % pSender->pSndBuf->size] = pBlk;
pBlk = NULL;
pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end);
}
pSender->lastSendTime = nowMs;
code = 0;
_OUT:;
if (pBlk != NULL) {
syncSnapBlockDestroy(pBlk);
pBlk = NULL;
}
return code;
} }
// send snapshot data from cache // send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) { int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
int32_t code = -1;
taosThreadMutexLock(&pSndBuf->mutex);
for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) {
SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size];
ASSERT(pBlk && !pBlk->acked);
int64_t nowMs = taosGetTimestampMs();
if (nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) {
continue;
}
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) { if (syncBuildSnapshotSend(&rpcMsg, pBlk->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr()); sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
return -1; goto _out;
} }
SyncSnapshotSend *pMsg = rpcMsg.pCont; SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; pMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pMsg->term = raftStoreGetTerm(pSender->pSyncNode); pMsg->term = pSender->term;
pMsg->beginIndex = pSender->snapshotParam.start; pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig; pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime; pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq; pMsg->seq = pBlk->seq;
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) { if (pBlk->pBlock != NULL && pBlk->blockLen > 0) {
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); memcpy(pMsg->data, pBlk->pBlock, pBlk->blockLen);
} }
// event log
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");
// send msg // send msg
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr()); sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
return -1; goto _out;
} }
pBlk->sendTimeMs = nowMs;
pSender->lastSendTime = taosGetTimestampMs();
return 0;
}
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
if (pMsg->ack != pSender->seq) {
sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
} }
code = 0;
pSender->ack = pMsg->ack; _out:;
pSender->seq++; taosThreadMutexUnlock(&pSndBuf->mutex);
return code;
sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
return 0;
} }
// return 0, start ok // return 0, start ok
@ -328,8 +396,6 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
return 0; return 0;
} }
sSInfo(pSender, "snapshot sender start");
int32_t code = snapshotSenderStart(pSender); int32_t code = snapshotSenderStart(pSender);
if (code != 0) { if (code != 0) {
sSError(pSender, "snapshot sender start error since %s", terrstr()); sSError(pSender, "snapshot sender start error since %s", terrstr());
@ -362,6 +428,16 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastApplyTerm = 0; pReceiver->snapshot.lastApplyTerm = 0;
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
SSyncSnapBuffer *pRcvBuf = syncSnapBufferCreate();
if (pRcvBuf == NULL) {
taosMemoryFree(pReceiver);
pReceiver = NULL;
return NULL;
}
pRcvBuf->entryDeleteCb = rpcFreeCont;
pReceiver->pRcvBuf = pRcvBuf;
syncSnapBufferReset(pReceiver->pRcvBuf);
return pReceiver; return pReceiver;
} }
@ -389,6 +465,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
pReceiver->snapshot.data = NULL; pReceiver->snapshot.data = NULL;
} }
// free snap buf
if (pReceiver->pRcvBuf) {
syncSnapBufferDestroy(&pReceiver->pRcvBuf);
}
// free receiver // free receiver
taosMemoryFree(pReceiver); taosMemoryFree(pReceiver);
} }
@ -444,20 +525,19 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true); int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
if (started) return; if (started) return;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP;
pReceiver->term = pPreMsg->term; pReceiver->term = pPreMsg->term;
pReceiver->fromId = pPreMsg->srcId; pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime; pReceiver->startTime = pPreMsg->startTime;
ASSERT(pReceiver->startTime); ASSERT(pReceiver->startTime);
// event log sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId));
sRInfo(pReceiver, "snapshot receiver is start");
} }
// just set start = false // just set start = false
// FpSnapshotStopWrite should not be called // FpSnapshotStopWrite should not be called
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter); sRDebug(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false); int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
if (stopped) return; if (stopped) return;
@ -472,6 +552,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
} else { } else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null"); sRInfo(pReceiver, "snapshot receiver stop, writer is null");
} }
syncSnapBufferReset(pReceiver->pRcvBuf);
} }
// when recv last snapshot block, apply data into snapshot // when recv last snapshot block, apply data into snapshot
@ -479,7 +561,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
int32_t code = 0; int32_t code = 0;
if (pReceiver->pWriter != NULL) { if (pReceiver->pWriter != NULL) {
// write data // write data
sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq); sRInfo(pReceiver, "snapshot receiver write about to finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
if (pMsg->dataLen > 0) { if (pMsg->dataLen > 0) {
code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
pMsg->dataLen); pMsg->dataLen);
@ -489,15 +571,6 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
} }
} }
// reset wal
sRInfo(pReceiver, "snapshot receiver log restore");
code =
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
if (code != 0) {
sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
return -1;
}
// update commit index // update commit index
if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) { if (pReceiver->snapshot.lastApplyIndex > pReceiver->pSyncNode->commitIndex) {
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
@ -509,7 +582,6 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
} }
// stop writer, apply data // stop writer, apply data
sRInfo(pReceiver, "snapshot receiver apply write");
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&pReceiver->snapshot); &pReceiver->snapshot);
if (code != 0) { if (code != 0) {
@ -517,21 +589,29 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
return -1; return -1;
} }
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
sRInfo(pReceiver, "snapshot receiver write stopped");
// update progress // update progress
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
// get fsmState
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
pReceiver->pSyncNode->fsmState = snapshot.state; pReceiver->pSyncNode->fsmState = snapshot.state;
// reset wal
code =
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
if (code != 0) {
sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
return -1;
}
sRInfo(pReceiver, "wal log restored from snapshot");
} else { } else {
sRError(pReceiver, "snapshot receiver finish error since writer is null"); sRError(pReceiver, "snapshot receiver finish error since writer is null");
return -1; return -1;
} }
// event log
sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished");
return 0; return 0;
} }
@ -599,22 +679,22 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM
int32_t order = 0; int32_t order = 0;
if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) { if ((order = snapshotReceiverSignatureCmp(pReceiver, pMsg)) < 0) {
sRInfo(pReceiver, sRInfo(pReceiver,
"received a new snapshot preparation. restart receiver" "received a new snapshot preparation. restart receiver."
"receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", " msg signature:(%" PRId64 ", %" PRId64 ")",
pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); pMsg->term, pMsg->startTime);
goto _START_RECEIVER; goto _START_RECEIVER;
} else if (order == 0) { } else if (order == 0) {
sRInfo(pReceiver, sRInfo(pReceiver,
"received a duplicate snapshot preparation. send reply" "received a duplicate snapshot preparation. send reply."
"receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", " msg signature:(%" PRId64 ", %" PRId64 ")",
pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); pMsg->term, pMsg->startTime);
goto _SEND_REPLY; goto _SEND_REPLY;
} else { } else {
// ignore // ignore
sRError(pReceiver, sRError(pReceiver,
"received a stale snapshot preparation. ignore" "received a stale snapshot preparation. ignore."
"receiver signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")", " msg signature:(%" PRId64 ", %" PRId64 ")",
pReceiver->term, pReceiver->startTime, pMsg->term, pMsg->startTime); pMsg->term, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = terrno; code = terrno;
goto _SEND_REPLY; goto _SEND_REPLY;
@ -765,29 +845,8 @@ _SEND_REPLY:
return code; return code;
} }
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { static int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, int32_t code) {
// condition 4 SSyncNode *pSyncNode = pReceiver->pSyncNode;
// transfering
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs();
int32_t code = 0;
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr());
code = terrno;
goto _SEND_REPLY;
}
if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
code = terrno;
if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
}
}
_SEND_REPLY:;
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) { if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) {
@ -811,10 +870,79 @@ _SEND_REPLY:;
sRError(pReceiver, "failed to send snapshot receiver resp since %s", terrstr()); sRError(pReceiver, "failed to send snapshot receiver resp since %s", terrstr());
return -1; return -1;
} }
return 0;
}
static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend **ppMsg) {
int32_t code = 0;
SSyncSnapBuffer *pRcvBuf = pReceiver->pRcvBuf;
SyncSnapshotSend *pMsg = ppMsg[0];
terrno = TSDB_CODE_SUCCESS;
taosThreadMutexLock(&pRcvBuf->mutex);
if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) {
terrno = TSDB_CODE_SYN_BUFFER_FULL;
code = terrno;
goto _out;
}
ASSERT(pRcvBuf->start <= pRcvBuf->cursor + 1 && pRcvBuf->cursor < pRcvBuf->end);
if (pMsg->seq > pRcvBuf->cursor) {
pRcvBuf->entries[pMsg->seq % pRcvBuf->size] = pMsg;
ppMsg[0] = NULL;
pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end);
} else {
syncSnapSendRsp(pReceiver, pMsg, code);
goto _out;
}
for (int64_t seq = pRcvBuf->cursor + 1; seq < pRcvBuf->end; ++seq) {
if (pRcvBuf->entries[seq]) {
pRcvBuf->cursor = seq;
} else {
break;
}
}
for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) {
if (snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size]) != 0) {
code = terrno;
if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
code = TSDB_CODE_SYN_INTERNAL_ERROR;
}
}
pRcvBuf->start = seq + 1;
syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], code);
pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
if (code) goto _out;
}
_out:
taosThreadMutexUnlock(&pRcvBuf->mutex);
return code; return code;
} }
static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) {
// condition 4
// transfering
SyncSnapshotSend *pMsg = ppMsg[0];
ASSERT(pMsg);
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs();
int32_t code = 0;
if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) {
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr());
return syncSnapSendRsp(pReceiver, pMsg, terrno);
}
return syncSnapBufferRecv(pReceiver, ppMsg);
}
static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// condition 2 // condition 2
// end, finish FSM // end, finish FSM
@ -867,7 +995,7 @@ _SEND_REPLY:;
// receiver on message // receiver on message
// //
// condition 1, recv SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT // condition 1, recv SYNC_SNAPSHOT_SEQ_PREP
// if receiver already start // if receiver already start
// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start) // if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
// if sender.start-time = receiver.start-time, maybe duplicate msg // if sender.start-time = receiver.start-time, maybe duplicate msg
@ -885,9 +1013,11 @@ _SEND_REPLY:;
// //
// condition 5, got data, update ack // condition 5, got data, update ack
// //
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
SyncSnapshotSend *pMsg = pRpcMsg->pCont; SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont;
SyncSnapshotSend *pMsg = ppMsg[0];
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int32_t code = 0;
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
@ -911,49 +1041,56 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term); syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term);
} }
// state, term, seq/ack if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) {
int32_t code = 0; sRError(pReceiver, "snapshot receiver not a follower or learner");
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER || pSyncNode->state == TAOS_SYNC_STATE_LEARNER) { return -1;
if (pMsg->term == raftStoreGetTerm(pSyncNode)) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) {
sInfo("vgId:%d, receive pre-snapshot msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, pMsg->term, pMsg->startTime);
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
sInfo("vgId:%d, receive begin msg of snapshot replication. signature:(%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, pMsg->term, pMsg->startTime);
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
sInfo("vgId:%d, receive end msg of snapshot replication. signature: (%" PRId64 ", %" PRId64 ")",
pSyncNode->vgId, pMsg->term, pMsg->startTime);
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
code = -1;
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
snapshotReceiverStop(pReceiver);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq data");
code = syncNodeOnSnapshotReceive(pSyncNode, pMsg);
} else {
// error log
sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
code = -1;
}
} else {
// error log
sRError(pReceiver, "snapshot receiver term not equal");
code = -1;
}
} else {
// error log
sRError(pReceiver, "snapshot receiver not follower");
code = -1;
} }
if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) {
sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq);
return -1;
}
// prepare
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PREP) {
sInfo("vgId:%d, prepare snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
pMsg->startTime);
code = syncNodeOnSnapshotPrep(pSyncNode, pMsg);
goto _out;
}
// begin
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
sInfo("vgId:%d, begin snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
pMsg->startTime);
code = syncNodeOnSnapshotBegin(pSyncNode, pMsg);
goto _out;
}
// data
if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
code = syncNodeOnSnapshotReceive(pSyncNode, ppMsg);
goto _out;
}
// end
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
sInfo("vgId:%d, end snap replication. msg signature:(%" PRId64 ", %" PRId64 ")", pSyncNode->vgId, pMsg->term,
pMsg->startTime);
code = syncNodeOnSnapshotEnd(pSyncNode, pMsg);
if (code != 0) {
sRError(pReceiver, "failed to end snapshot.");
goto _out;
}
code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
if (code != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
}
goto _out;
}
_out:;
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
return code; return code;
} }
@ -993,41 +1130,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
// update next index // update next index
syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1); syncIndexMgrSetIndex(pSyncNode->pNextIndex, &pMsg->srcId, snapshot.lastApplyIndex + 1);
// update seq return snapshotSend(pSender);
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
// build begin msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "prepare snapshot failed since build msg error");
return -1;
}
SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId;
pSendMsg->destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
pSendMsg->term = raftStoreGetTerm(pSender->pSyncNode);
pSendMsg->beginIndex = pSender->snapshotParam.start;
pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pSendMsg->lastConfig = pSender->lastConfig;
pSendMsg->startTime = pSender->startTime;
pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
ASSERT(pSendMsg->startTime);
sSInfo(pSender, "begin snapshot replication to dnode %d. startTime:%" PRId64, DID(&pSendMsg->destId),
pSendMsg->startTime);
// send msg
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "prepare snapshot failed since send msg error");
return -1;
}
return 0;
} }
static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
@ -1038,14 +1141,77 @@ static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnap
return 0; return 0;
} }
static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp **ppMsg) {
int32_t code = 0;
SSyncSnapBuffer *pSndBuf = pSender->pSndBuf;
SyncSnapshotRsp *pMsg = ppMsg[0];
taosThreadMutexLock(&pSndBuf->mutex);
if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) {
code = terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _out;
}
if (pSender->pReader == NULL || pSender->finish) {
code = terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _out;
}
if (pMsg->ack - pSndBuf->start >= pSndBuf->size) {
code = terrno = TSDB_CODE_SYN_BUFFER_FULL;
goto _out;
}
ASSERT(pSndBuf->start <= pSndBuf->cursor + 1 && pSndBuf->cursor < pSndBuf->end);
if (pMsg->ack > pSndBuf->cursor && pMsg->ack < pSndBuf->end) {
SyncSnapBlock *pBlk = pSndBuf->entries[pMsg->ack % pSndBuf->size];
ASSERT(pBlk);
pBlk->acked = 1;
}
for (int64_t ack = pSndBuf->cursor + 1; ack < pSndBuf->end; ++ack) {
SyncSnapBlock *pBlk = pSndBuf->entries[ack % pSndBuf->size];
if (pBlk->acked) {
pSndBuf->cursor = ack;
} else {
break;
}
}
for (int64_t ack = pSndBuf->start; ack <= pSndBuf->cursor; ++ack) {
pSndBuf->entryDeleteCb(pSndBuf->entries[ack % pSndBuf->size]);
pSndBuf->entries[ack % pSndBuf->size] = NULL;
pSndBuf->start = ack + 1;
}
while (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSender->seq - pSndBuf->start < (pSndBuf->size >> 2)) {
if (snapshotSend(pSender) != 0) {
code = terrno;
goto _out;
}
}
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) {
if (snapshotSend(pSender) != 0) {
code = terrno;
goto _out;
}
}
_out:
taosThreadMutexUnlock(&pSndBuf->mutex);
return code;
}
// sender on message // sender on message
// //
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender // condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq // condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log // condition 3 sender receives error msg, just print error log
// //
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
SyncSnapshotRsp *pMsg = pRpcMsg->pCont; SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont;
SyncSnapshotRsp *pMsg = ppMsg[0];
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
@ -1071,10 +1237,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// check signature // check signature
int32_t order = 0; int32_t order = 0;
if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) { if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) {
sSError(pSender, sSWarn(pSender, "ignore a stale snap rsp, msg signature:(%" PRId64 ", %" PRId64 ").", pMsg->term, pMsg->startTime);
"received a stale snapshot rsp. ignore it"
"sender signature: (%" PRId64 ", %" PRId64 "), msg signature:(%" PRId64 ", %" PRId64 ")",
pSender->term, pSender->startTime, pMsg->term, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1; return -1;
} else if (order < 0) { } else if (order < 0) {
@ -1083,9 +1246,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
goto _ERROR; goto _ERROR;
} }
// state, term, seq/ack
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
sSError(pSender, "snapshot sender not leader"); sSError(pSender, "snapshot sender not leader");
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
goto _ERROR; goto _ERROR;
@ -1093,83 +1254,46 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
SyncTerm currentTerm = raftStoreGetTerm(pSyncNode); SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
if (pMsg->term != currentTerm) { if (pMsg->term != currentTerm) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
currentTerm); currentTerm);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _ERROR; goto _ERROR;
} }
if (pMsg->code != 0) { if (pMsg->code != 0) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error code");
sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code); sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code);
terrno = pMsg->code; terrno = pMsg->code;
goto _ERROR; goto _ERROR;
} }
// prepare <begin, end>, send begin msg // send begin
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot"); sSInfo(pSender, "process prepare rsp");
return syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg); if (syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg) != 0) {
}
if (pSender->pReader == NULL || pSender->finish) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code,
pSender->pReader, pSender->finish);
terrno = pMsg->code;
goto _ERROR; goto _ERROR;
} }
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
return -1;
} }
if (snapshotSend(pSender) != 0) { // send msg of data or end
return -1; if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) {
if (syncSnapBufferSend(pSender, ppMsg) != 0) {
sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", terrstr(), pSender->seq,
pSender->pReader, pSender->finish);
goto _ERROR;
} }
return 0;
} }
// receive ack is finish, close sender // end
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end"); sSInfo(pSender, "process end rsp");
snapshotSenderStop(pSender, true); snapshotSenderStop(pSender, true);
syncNodeReplicateReset(pSyncNode, &pMsg->srcId); syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return 0;
}
// send next msg
if (pMsg->ack == pSender->seq) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq data");
// update sender ack
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
return -1;
}
if (snapshotSend(pSender) != 0) {
return -1;
}
} else if (pMsg->ack == pSender->seq - 1) {
// maybe resend
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
if (snapshotReSend(pSender) != 0) {
return -1;
}
} else {
// error log
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "receive error ack");
sSError(pSender, "snapshot sender receive error ack:%d, my seq:%d", pMsg->ack, pSender->seq);
snapshotSenderStop(pSender, true);
syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return -1;
} }
return 0; return 0;
_ERROR: _ERROR:
snapshotSenderStop(pSender, true); snapshotSenderStop(pSender, false);
syncNodeReplicateReset(pSyncNode, &pMsg->srcId); syncNodeReplicateReset(pSyncNode, &pMsg->srcId);
return -1; return -1;
} }

View File

@ -77,12 +77,19 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
for (int i = 0; i < ths->peersNum; ++i) { for (int i = 0; i < ths->peersNum; ++i) {
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i])); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i]));
if (pSender != NULL) { if (pSender != NULL) {
if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start && if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start) {
timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) { int64_t elapsedMs = timeNow - pSender->lastSendTime;
snapshotReSend(pSender); if (elapsedMs < SYNC_SNAP_RESEND_MS) {
continue;
}
if (elapsedMs > SYNC_SNAP_TIMEOUT_MS) {
sSError(pSender, "snap replication timeout, terminate.");
snapshotSenderStop(pSender, false);
} else { } else {
sTrace("vgId:%d, do not resend: nstart%d, now:%" PRId64 ", lstsend:%" PRId64 ", diff:%" PRId64, ths->vgId, sSWarn(pSender, "snap replication resend.");
ths->isStart, timeNow, pSender->lastSendTime, timeNow - pSender->lastSendTime); snapshotReSend(pSender);
}
} }
} }
} }

View File

@ -267,21 +267,23 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
va_end(argpointer); va_end(argpointer);
taosPrintLog(flags, level, dflag, taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s, snap-sender:{%p start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 "vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
" last-term:%" PRIu64 " last-cfg:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
", seq:%d ack:%d finish:%d, as:%d dnode:%d}" ", seq:%d, ack:%d, "
" buf:[%" PRId64 " %" PRId64 ", %" PRId64
"), finish:%d, as:%d, to-dnode:%d}"
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64 ", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, lc-timer:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s", ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start, pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime,
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), pNode->commitIndex, pSender->pSndBuf->start, pSender->pSndBuf->cursor, pSender->pSndBuf->end, pSender->finish,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode),
pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum,
pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
} }
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
@ -316,19 +318,21 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
taosPrintLog( taosPrintLog(
flags, level, dflag, flags, level, dflag,
"vgId:%d, %s, sync:%s," "vgId:%d, %s, sync:%s,"
" snap-receiver:{%p started:%d acked:%d term:%" PRIu64 " start-time:%" PRId64 " from-dnode:%d, start:%" PRId64 " snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d buf:[%" PRId64 " %" PRId64 ", %" PRId64
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64 ")"
" from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
"}" "}"
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64 ", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64 ", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, lc-timers:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s", ", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start,
pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->ack, pReceiver->pRcvBuf->start, pReceiver->pRcvBuf->cursor, pReceiver->pRcvBuf->end,
DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr,
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); cfgStr);
} }
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) { void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {

View File

@ -21,6 +21,7 @@ sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 in
sql create stream streams2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s) sliding (5s); sql create stream streams2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s) sliding (5s);
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s); sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s);
sql create stream stream_t2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s); sql create stream stream_t2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s);
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0); sql insert into t1 values(1648791210000,1,2,3,1.0);
sql insert into t1 values(1648791216000,2,2,3,1.1); sql insert into t1 values(1648791216000,2,2,3,1.1);
@ -311,6 +312,7 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams11 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); sql create stream streams11 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
sql create stream streams12 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); sql create stream streams12 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sleep 1000
sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1); sql insert into t1 values(1648791223001,2,2,3,1.1);
@ -444,6 +446,7 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt21 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s); sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt21 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s); sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1); sql insert into t1 values(1648791223001,2,2,2,1.1);
@ -582,6 +585,7 @@ sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2); sql create table t2 using st tags(2,2,2);
sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s); sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1); sql insert into t1 values(1648791223001,2,2,2,1.1);
@ -706,6 +710,7 @@ sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2); sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s); sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0); sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791243000,2,1,1,1.0); sql insert into t1 values(1648791243000,2,1,1,1.0);