diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 6f4f15d1e8..72aab9adf0 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -55,8 +55,8 @@ typedef struct SSessionKey { } SSessionKey; typedef struct SVersionRange { - uint64_t minVer; - uint64_t maxVer; + int64_t minVer; + int64_t maxVer; } SVersionRange; static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) { diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 50e60d2ef4..71c56e8c86 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -36,8 +36,7 @@ extern "C" { #define SYNC_DEL_WAL_MS (1000 * 60) #define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1) -#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10 -#define SNAPSHOT_WAIT_MS 1000 * 30 +#define SNAPSHOT_WAIT_MS 1000 * 5 #define SYNC_MAX_RETRY_BACKOFF 5 #define SYNC_LOG_REPL_RETRY_WAIT_MS 100 @@ -88,10 +87,9 @@ typedef enum { } ESyncRole; typedef enum { - TAOS_SYNC_SNAP_INFO_BRIEF = 0, - TAOS_SYNC_SNAP_INFO_FULL = 1, - TAOS_SYNC_SNAP_INFO_DIFF = 2, -} ESyncSnapInfoTyp; + SYNC_FSM_STATE_NORMAL = 0, + SYNC_FSM_STATE_INCOMPLETE, +} ESyncFsmState; typedef struct SNodeInfo { int64_t clusterId; @@ -155,8 +153,9 @@ typedef struct SSnapshotParam { } SSnapshotParam; typedef struct SSnapshot { - int32_t typ; + int32_t type; SSyncTLV* data; + ESyncFsmState state; SyncIndex lastApplyIndex; SyncTerm lastApplyTerm; SyncIndex lastConfigIndex; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 39ae3fb97a..6fbe4422ac 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -557,7 +557,7 @@ int32_t* taosGetErrno(); // #define TSDB_CODE_SYN_TOO_MANY_FWDINFO TAOS_DEF_ERROR_CODE(0, 0x0904) // 2.x // #define TSDB_CODE_SYN_MISMATCHED_PROTOCOL TAOS_DEF_ERROR_CODE(0, 0x0905) // 2.x // #define TSDB_CODE_SYN_MISMATCHED_CLUSTERID TAOS_DEF_ERROR_CODE(0, 0x0906) // 2.x -// #define TSDB_CODE_SYN_MISMATCHED_SIGNATURE TAOS_DEF_ERROR_CODE(0, 0x0907) // 2.x +#define TSDB_CODE_SYN_MISMATCHED_SIGNATURE TAOS_DEF_ERROR_CODE(0, 0x0907) // #define TSDB_CODE_SYN_INVALID_CHECKSUM TAOS_DEF_ERROR_CODE(0, 0x0908) // 2.x // #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) // 2.x // #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) // 2.x diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4cea7c5e85..1d17d616ab 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -1025,6 +1025,11 @@ struct STsdbFilterInfo { TABLEID tbid; }; +enum { + TSDB_FS_STATE_NORMAL = 0, + TSDB_FS_STATE_INCOMPLETE, +}; + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index 759fded522..ef2f81fa02 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -38,13 +38,6 @@ typedef struct { STFileHashEntry **buckets; } STFileHash; -enum { - TSDB_FS_STATE_NONE = 0, - TSDB_FS_STATE_OPEN, - TSDB_FS_STATE_EDIT, - TSDB_FS_STATE_CLOSE, -}; - static const char *gCurrentFname[] = { [TSDB_FCURRENT] = "current.json", [TSDB_FCURRENT_C] = "current.c.json", @@ -57,7 +50,7 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) { fs[0]->tsdb = pTsdb; tsem_init(&fs[0]->canEdit, 0, 1); - fs[0]->state = TSDB_FS_STATE_NONE; + fs[0]->fsstate = TSDB_FS_STATE_NORMAL; fs[0]->neid = 0; TARRAY2_INIT(fs[0]->fSetArr); TARRAY2_INIT(fs[0]->fSetArrTmp); @@ -496,6 +489,7 @@ static void tsdbFSDestroyFileObjHash(STFileHash *hash) { static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { int32_t code = 0; int32_t lino = 0; + int32_t corrupt = false; { // scan each file STFileSet *fset = NULL; @@ -503,8 +497,12 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { // data file for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { if (fset->farr[ftype] == NULL) continue; - code = tsdbFSDoScanAndFixFile(fs, fset->farr[ftype]); - TSDB_CHECK_CODE(code, lino, _exit); + STFileObj *fobj = fset->farr[ftype]; + code = tsdbFSDoScanAndFixFile(fs, fobj); + if (code) { + fset->maxVerValid = TMIN(fset->maxVerValid, fobj->f->minVer - 1); + corrupt = true; + } } // stt file @@ -513,12 +511,21 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { STFileObj *fobj; TARRAY2_FOREACH(lvl->fobjArr, fobj) { code = tsdbFSDoScanAndFixFile(fs, fobj); - TSDB_CHECK_CODE(code, lino, _exit); + if (code) { + fset->maxVerValid = TMIN(fset->maxVerValid, fobj->f->minVer - 1); + corrupt = true; + } } } } } + if (corrupt) { + tsdbError("vgId:%d, not to clear unreferenced files since some fset corrupted", TD_VID(fs->tsdb->pVnode)); + fs->fsstate = TSDB_FS_STATE_INCOMPLETE; + goto _exit; + } + { // clear unreferenced files STfsDir *dir = tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path); if (dir == NULL) { @@ -1009,6 +1016,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclu ever = u->sver - 1; i++; } + break; } code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr); @@ -1057,8 +1065,11 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev sver1 = u->sver; i++; } + break; } + if (sver1 > ever1) continue; + tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1); code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.h b/source/dnode/vnode/src/tsdb/tsdbFS2.h index 73b27a8fbd..851459df53 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.h @@ -101,7 +101,7 @@ struct STFSBgTask { struct STFileSystem { STsdb *tsdb; tsem_t canEdit; - int32_t state; + int32_t fsstate; int64_t neid; EFEditT etype; TFileSetArray fSetArr[1]; diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.c b/source/dnode/vnode/src/tsdb/tsdbFSet2.c index dd86d01598..620fcb3a47 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.c @@ -452,6 +452,7 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) { if (fset[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY; fset[0]->fid = fid; + fset[0]->maxVerValid = VERSION_MAX; TARRAY2_INIT(fset[0]->lvlArr); return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFSet2.h b/source/dnode/vnode/src/tsdb/tsdbFSet2.h index c78cc179df..ea0f99f68e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFSet2.h +++ b/source/dnode/vnode/src/tsdb/tsdbFSet2.h @@ -84,7 +84,7 @@ struct SSttLvl { struct STFileSet { int32_t fid; - int8_t stat; + int64_t maxVerValid; STFileObj *farr[TSDB_FTYPE_MAX]; // file array TSttLvlArray lvlArr[1]; // level array }; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 3b74475870..6ee7112906 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -436,8 +436,8 @@ _exit: taosMemoryFree(reader[0]); reader[0] = NULL; } else { - tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, sver, ever, - type); + tsdbInfo("vgId:%d tsdb snapshot reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), + sver, ever, type); } return code; } @@ -1103,6 +1103,8 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) { TSDB_CHECK_CODE(code, lino, _exit); } + writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL; + taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock); } tsdbFSEnableBgTask(tsdb->pFS); @@ -1218,14 +1220,28 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP goto _err; } + p->fid = fset->fid; + + int32_t code = 0; int32_t typ = 0; + int32_t corrupt = false; + int32_t count = 0; for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) { if (fset->farr[ftype] == NULL) continue; typ = tsdbFTypeToSRangeTyp(ftype); ASSERT(typ < TSDB_SNAP_RANGE_TYP_MAX); STFile* f = fset->farr[ftype]->f; + if (f->maxVer > fset->maxVerValid) { + corrupt = true; + tsdbError("skip incomplete data file: fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 + ", ftype: %d", + fset->fid, fset->maxVerValid, f->minVer, f->maxVer, ftype); + continue; + } + count++; SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; - TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); + code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); + ASSERT(code == 0); } typ = TSDB_SNAP_RANGE_TYP_STT; @@ -1234,10 +1250,24 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP STFileObj* fobj; TARRAY2_FOREACH(lvl->fobjArr, fobj) { STFile* f = fobj->f; + if (f->maxVer > fset->maxVerValid) { + corrupt = true; + tsdbError("skip incomplete stt file.fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 + ", ftype: %d", + fset->fid, fset->maxVerValid, f->minVer, f->maxVer, typ); + continue; + } + count++; SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer}; - TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); + code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); + ASSERT(code == 0); } } + if (corrupt && count == 0) { + SVersionRange vr = {.minVer = VERSION_MIN, .maxVer = fset->maxVerValid}; + code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn); + ASSERT(code == 0); + } ppSP[0] = p; return 0; @@ -1272,7 +1302,8 @@ static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) { break; } ASSERT(pItem != NULL); - TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn); + code = TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn); + ASSERT(code == 0); } taosThreadRwlockUnlock(&fs->tsdb->rwLock); @@ -1432,18 +1463,22 @@ int32_t tsdbSnapPartListToRangeDiff(STsdbSnapPartList* pList, TSnapRangeArray** terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - int64_t ever = -1; + int64_t maxVerValid = -1; int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX; for (int32_t i = 0; i < typMax; i++) { SVerRangeList* iList = &part->verRanges[i]; - SVersionRange r = {0}; - TARRAY2_FOREACH(iList, r) { - if (r.maxVer < r.minVer) continue; - ever = TMAX(ever, r.maxVer); + SVersionRange vr = {0}; + TARRAY2_FOREACH(iList, vr) { + if (vr.maxVer < vr.minVer) { + continue; + } + maxVerValid = TMAX(maxVerValid, vr.maxVer); } } - r->sver = ever + 1; + r->fid = part->fid; + r->sver = maxVerValid + 1; r->ever = VERSION_MAX; + tsdbInfo("range diff fid:%" PRId64 ", sver:%" PRId64 ", ever:%" PRId64, part->fid, r->sver, r->ever); TARRAY2_APPEND(pDiff, r); } ppRanges[0] = pDiff; @@ -1473,14 +1508,16 @@ void tsdbSnapPartListDestroy(STsdbSnapPartList** ppList) { } int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap) { - if (pSnap->typ != TDMT_SYNC_PREP_SNAPSHOT && pSnap->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { + pSnap->state = pTsdb->pFS->fsstate; + if (pSnap->type != TDMT_SYNC_PREP_SNAPSHOT && pSnap->type != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { return 0; } + int code = -1; STsdbSnapPartList* pList = tsdbGetSnapPartList(pTsdb->pFS); if (pList == NULL) goto _out; - if (pSnap->typ == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { + if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { } void* buf = NULL; @@ -1499,7 +1536,7 @@ int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap) { // header SSyncTLV* datHead = (void*)pSnap->data; - datHead->typ = pSnap->typ; + datHead->typ = pSnap->type; datHead->len = 0; // tsdb diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index fada83a7f1..5084cc2ff5 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#include "sync.h" +#include "tsdb.h" #include "vnd.h" #include "vndCos.h" @@ -517,10 +519,3 @@ ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); } void vnodeStop(SVnode *pVnode) {} int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } - -int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) { - pSnap->lastApplyIndex = pVnode->state.committed; - pSnap->lastApplyTerm = pVnode->state.commitTerm; - pSnap->lastConfigIndex = -1; - return tsdbSnapGetInfo(pVnode->pTsdb, pSnap); -} diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index b2fbdc07e9..0874e5e0d8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -537,7 +537,9 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot * char dir[TSDB_FILENAME_LEN] = {0}; vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN); - vnodeCommitInfo(dir); + code = vnodeCommitInfo(dir); + if (code) goto _exit; + } else { vnodeRollback(pWriter->pVnode); } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index b73c9b8c65..ba142ddb6d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -15,6 +15,8 @@ #define _DEFAULT_SOURCE #include "tq.h" +#include "sync.h" +#include "tsdb.h" #include "vnd.h" #define BATCH_ENABLE 0 @@ -783,3 +785,14 @@ bool vnodeIsLeader(SVnode *pVnode) { return true; } + +int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) { + pSnap->lastApplyIndex = pVnode->state.committed; + pSnap->lastApplyTerm = pVnode->state.commitTerm; + pSnap->lastConfigIndex = -1; + + int32_t code = tsdbSnapGetInfo(pVnode->pTsdb, pSnap); + + pSnap->state = (pSnap->state == TSDB_FS_STATE_INCOMPLETE) ? SYNC_FSM_STATE_INCOMPLETE : SYNC_FSM_STATE_NORMAL; + return code; +} diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 870cdd6a72..cec1a12024 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -139,6 +139,7 @@ typedef struct SSyncNode { SSyncFSM* pFsm; int32_t quorum; SRaftId leaderCache; + ESyncFsmState fsmState; // life cycle int64_t rid; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index c0d3663a8f..9054f47d37 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -116,7 +116,7 @@ typedef struct SyncAppendEntriesReply { SyncIndex matchIndex; SyncIndex lastSendIndex; int64_t startTime; - int16_t reserved; + int16_t fsmState; } SyncAppendEntriesReply; typedef struct SyncHeartbeat { diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 063b4f51f5..2a19945c5a 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -31,7 +31,7 @@ extern "C" { #define SYNC_SNAPSHOT_RETRY_MS 5000 typedef struct SSyncSnapshotSender { - bool start; + int8_t start; int32_t seq; int32_t ack; void *pReader; @@ -60,8 +60,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finis int32_t snapshotReSend(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { - // update when pre snapshot - bool start; + // update when prep snapshot + int8_t start; int32_t ack; SyncTerm term; SRaftId fromId; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 925988f43a..8ae1dd2a54 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -155,6 +155,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pEntry->term); + if (ths->fsmState == SYNC_FSM_STATE_INCOMPLETE) { + pReply->fsmState = ths->fsmState; + sError("vgId:%d, not allow to accept sync log msg due to incomplete fsm state", ths->vgId); + syncEntryDestroy(pEntry); + goto _SEND_RESPONSE; + } + // accept if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) { goto _SEND_RESPONSE; @@ -175,7 +182,7 @@ _SEND_RESPONSE: (void)syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp); // commit index, i.e. leader notice me - if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { + if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr()); } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 86e28db90c..b4e2049a64 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -71,6 +71,11 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { } int32_t syncNodeElect(SSyncNode* pSyncNode) { + if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) { + sNError(pSyncNode, "ignore leader hb timeout due to incomplete fsm state"); + return -1; + } + sNInfo(pSyncNode, "begin election"); pSyncNode->electNum++; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index eca499cf28..8ddd55d906 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1009,6 +1009,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { commitIndex = snapshot.lastApplyIndex; sNTrace(pSyncNode, "reset commit index by snapshot"); } + pSyncNode->fsmState = snapshot.state; + if (pSyncNode->fsmState) { + sError("vgId:%d, fsm state incomplete.", pSyncNode->vgId); + } } pSyncNode->commitIndex = commitIndex; sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); @@ -1163,7 +1167,8 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); - if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) { + if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE && + syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) { return -1; } @@ -1455,10 +1460,9 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg } if (code < 0) { - sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet, - DID(destRaftId), destRaftId->addr, terrno); + sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, terrstr(), epSet, + DID(destRaftId), destRaftId->addr); rpcFreeCont(pMsg->pCont); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } return code; @@ -2895,7 +2899,7 @@ _out:; // single replica (void)syncNodeUpdateCommitIndex(ths, matchIndex); - if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { + if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex); code = -1; } @@ -3139,7 +3143,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMsg->currentTerm == matchTerm) { (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); } - if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { + if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(), ths->commitIndex); } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 019f8f7e62..a38d67a388 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -839,14 +839,16 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn return 0; } - if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) { - sWarn("vgId:%d, failed to rollback match index. peer: dnode:%d, match index:%" PRId64 ", last sent:%" PRId64, - pNode->vgId, DID(&destId), pMsg->matchIndex, pMsg->lastSendIndex); + if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) { + char* msg1 = "rollback match index failure"; + char* msg2 = "incomplete fsm state"; + sInfo("vgId:%d, snapshot replication to dnode:%d. reason:%s, match index:%" PRId64 ", last sent:%" PRId64, + pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex, + pMsg->lastSendIndex); if (syncNodeStartSnapshot(pNode, &destId) < 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); return -1; } - sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId)); return 0; } } @@ -1000,10 +1002,9 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { pMgr->endIndex = index + 1; if (barrier) { - sInfo("vgId:%d, replicated sync barrier to dest:%" PRIx64 ". index:%" PRId64 ", term:%" PRId64 + sInfo("vgId:%d, replicated sync barrier to dnode:%d. index:%" PRId64 ", term:%" PRId64 ", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, - pMgr->endIndex); + pNode->vgId, DID(pDestId), index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex); break; } } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 99e8fd55a2..383fda89b0 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -44,8 +44,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; pSender->term = raftStoreGetTerm(pSyncNode); - pSender->startTime = 0; - pSender->endTime = 0; + pSender->startTime = -1; + pSender->endTime = -1; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; @@ -71,11 +71,16 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { taosMemoryFree(pSender); } -bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } +bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); } int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { int32_t code = -1; - pSender->start = true; + + int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true); + if (started) return 0; + + taosMsleep(1); + pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->pReader = NULL; @@ -92,13 +97,13 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig)); pSender->sendingMS = 0; pSender->term = raftStoreGetTerm(pSender->pSyncNode); - pSender->startTime = taosGetTimestampMs(); + pSender->startTime = taosGetMonoTimestampMs(); pSender->lastSendTime = pSender->startTime; pSender->finish = false; // Get full snapshot info SSyncNode *pSyncNode = pSender->pSyncNode; - SSnapshot snapInfo = {.typ = TDMT_SYNC_PREP_SNAPSHOT}; + SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT}; if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) { sSError(pSender, "snapshot get info failure since %s", terrstr()); goto _out; @@ -130,11 +135,11 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfig = pSender->lastConfig; - pMsg->startTime = pSender->startTime; + pMsg->startTime = atomic_load_64(&pSender->startTime); pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; if (dataLen > 0) { - pMsg->payloadType = snapInfo.typ; + pMsg->payloadType = snapInfo.type; memcpy(pMsg->data, snapInfo.data, dataLen); } @@ -160,7 +165,9 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader); // update flag - pSender->start = false; + int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false); + if (stopped) return; + pSender->finish = finish; pSender->endTime = taosGetTimestampMs(); @@ -223,6 +230,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfig = pSender->lastConfig; + pMsg->startTime = pSender->startTime; pMsg->seq = pSender->seq; if (pSender->pCurrentBlock != NULL) { @@ -286,7 +294,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { if (pMsg->ack != pSender->seq) { sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; return -1; } @@ -301,8 +309,6 @@ static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSn // return 1, last snapshot finish ok // return -1, error int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { - sNInfo(pSyncNode, "snapshot sender starting ..."); - SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); if (pSender == NULL) { sNError(pSyncNode, "snapshot sender start error since get failed"); @@ -310,12 +316,12 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { } if (snapshotSenderIsStart(pSender)) { - sSInfo(pSender, "snapshot sender already start, ignore"); + sSDebug(pSender, "snapshot sender already start, ignore"); return 0; } if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) { - sSInfo(pSender, "snapshot sender start too frequently, ignore"); + sSDebug(pSender, "snapshot sender start too frequently, ignore"); return 0; } @@ -342,6 +348,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from } pReceiver->start = false; + pReceiver->startTime = 0; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->pWriter = NULL; pReceiver->pSyncNode = pSyncNode; @@ -384,7 +391,7 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { } bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { - return (pReceiver != NULL ? pReceiver->start : false); + return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false); } static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { @@ -423,11 +430,14 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p return; } - pReceiver->start = true; + int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true); + if (started) return; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT; pReceiver->term = raftStoreGetTerm(pReceiver->pSyncNode); pReceiver->fromId = pPreMsg->srcId; pReceiver->startTime = pPreMsg->startTime; + ASSERT(pReceiver->startTime); // event log sRInfo(pReceiver, "snapshot receiver is start"); @@ -438,6 +448,9 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter); + int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false); + if (stopped) return; + if (pReceiver->pWriter != NULL) { int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, &pReceiver->snapshot); @@ -448,8 +461,6 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { } else { sRInfo(pReceiver, "snapshot receiver stop, writer is null"); } - - pReceiver->start = false; } // when recv last snapshot block, apply data into snapshot @@ -499,6 +510,10 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap // update progress pReceiver->ack = SYNC_SNAPSHOT_SEQ_END; + SSnapshot snapshot = {0}; + pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot); + pReceiver->pSyncNode->fsmState = snapshot.state; + } else { sRError(pReceiver, "snapshot receiver finish error since writer is null"); return -1; @@ -582,7 +597,7 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM // ignore sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore", pReceiver->startTime, pMsg->startTime); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; code = terrno; goto _SEND_REPLY; } @@ -593,33 +608,18 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM } _START_RECEIVER: - if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) { - sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow, - pMsg->startTime); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - code = terrno; - } else { - // waiting for clock match - while (timeNow < pMsg->startTime) { - sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", startTime:%" PRId64, timeNow, - pMsg->startTime); - taosMsleep(10); - timeNow = taosGetTimestampMs(); - } - - if (snapshotReceiverIsStart(pReceiver)) { - sRInfo(pReceiver, "snapshot receiver already start and force stop pre one"); - snapshotReceiverStop(pReceiver); - } - - snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender + if (snapshotReceiverIsStart(pReceiver)) { + sRInfo(pReceiver, "snapshot receiver already start and force stop pre one"); + snapshotReceiverStop(pReceiver); } + snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender + _SEND_REPLY: // build msg ; // make complier happy - SSnapshot snapInfo = {.typ = TDMT_SYNC_PREP_SNAPSHOT_REPLY}; + SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY}; int32_t dataLen = 0; if (pMsg->dataLen > 0) { void *data = taosMemoryCalloc(1, pMsg->dataLen); @@ -655,13 +655,15 @@ _SEND_REPLY: pRspMsg->term = raftStoreGetTerm(pSyncNode); pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->startTime = pReceiver->startTime; + pRspMsg->startTime = pMsg->startTime; pRspMsg->ack = pMsg->seq; // receiver maybe already closed pRspMsg->code = code; pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode); + ASSERT(pRspMsg->startTime); + if (snapInfo.data) { - pRspMsg->payloadType = snapInfo.typ; + pRspMsg->payloadType = snapInfo.type; memcpy(pRspMsg->data, snapInfo.data, dataLen); // save snapshot info @@ -704,6 +706,7 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p if (pReceiver->startTime != pMsg->startTime) { sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64, pReceiver->startTime, pMsg->startTime); + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _SEND_REPLY; } @@ -732,11 +735,13 @@ _SEND_REPLY: pRspMsg->term = raftStoreGetTerm(pSyncNode); pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->startTime = pReceiver->startTime; + pRspMsg->startTime = pMsg->startTime; pRspMsg->ack = pReceiver->ack; // receiver maybe already closed pRspMsg->code = code; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; + ASSERT(pRspMsg->startTime); + // send msg syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin"); if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { @@ -751,17 +756,17 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend // condition 4 // transfering SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; - - // waiting for clock match int64_t timeNow = taosGetTimestampMs(); - while (timeNow < pMsg->startTime) { - sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, - pMsg->startTime); - taosMsleep(10); - timeNow = taosGetTimestampMs(); + int32_t code = 0; + + if (pReceiver->startTime != pMsg->startTime) { + sRError(pReceiver, "snapshot receive failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64, + pReceiver->startTime, pMsg->startTime); + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + code = terrno; + goto _SEND_REPLY; } - int32_t code = 0; if (snapshotReceiverGotData(pReceiver, pMsg) != 0) { code = terrno; if (code >= SYNC_SNAPSHOT_SEQ_INVALID) { @@ -769,6 +774,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend } } +_SEND_REPLY: // build msg SRpcMsg rpcMsg = {0}; if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) { @@ -782,11 +788,12 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend pRspMsg->term = raftStoreGetTerm(pSyncNode); pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->startTime = pReceiver->startTime; + pRspMsg->startTime = pMsg->startTime; pRspMsg->ack = pReceiver->ack; // receiver maybe already closed pRspMsg->code = code; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; + ASSERT(pRspMsg->startTime); // send msg syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received"); if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { @@ -801,21 +808,23 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs // condition 2 // end, finish FSM SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; - - // waiting for clock match int64_t timeNow = taosGetTimestampMs(); - while (timeNow < pMsg->startTime) { - sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, - pMsg->startTime); - taosMsleep(10); - timeNow = taosGetTimestampMs(); + int32_t code = 0; + + if (pReceiver->startTime != pMsg->startTime) { + sRError(pReceiver, "snapshot end failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64, + pReceiver->startTime, pMsg->startTime); + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + code = terrno; + goto _SEND_REPLY; } - int32_t code = snapshotReceiverFinish(pReceiver, pMsg); + code = snapshotReceiverFinish(pReceiver, pMsg); if (code == 0) { snapshotReceiverStop(pReceiver); } +_SEND_REPLY: // build msg SRpcMsg rpcMsg = {0}; if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId) != 0) { @@ -829,7 +838,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs pRspMsg->term = raftStoreGetTerm(pSyncNode); pRspMsg->lastIndex = pMsg->lastIndex; pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->startTime = pReceiver->startTime; + pRspMsg->startTime = pMsg->startTime; pRspMsg->ack = pReceiver->ack; // receiver maybe already closed pRspMsg->code = code; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; @@ -945,13 +954,6 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64, pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm); - if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) { - sSError(pSender, "prepare snapshot failed since beginIndex:%" PRId64 " larger than applyIndex:%" PRId64, - pMsg->snapBeginIndex, snapshot.lastApplyIndex); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; - } - // update sender pSender->snapshot = snapshot; @@ -964,6 +966,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend return -1; } pSender->snapshotParam.data = (void *)pMsg->data; + sSInfo(pSender, "data of snapshot param. len: %d", datHead->len); } int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); @@ -997,6 +1000,11 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend pSendMsg->startTime = pSender->startTime; pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN; + ASSERT(pSendMsg->startTime); + + sSInfo(pSender, "begin snapshot replication to dnode %d. startTime:%" PRId64, DID(&pSendMsg->destId), + pSendMsg->startTime); + // send msg syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre"); if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { @@ -1019,7 +1027,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped"); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; return -1; } @@ -1031,6 +1039,25 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { return -1; } + if (!snapshotSenderIsStart(pSender)) { + sSError(pSender, "snapshot sender not started yet. sender startTime:%" PRId64 ", msg startTime:%" PRId64, + pSender->startTime, pMsg->startTime); + return -1; + } + + if (pMsg->startTime < pSender->startTime) { + sSError(pSender, "ignore stale rsp received. sender startTime:%" PRId64 ", msg startTime:%" PRId64, + pSender->startTime, pMsg->startTime); + terrno = pMsg->code; + return -1; + } else if (pMsg->startTime > pSender->startTime) { + sSError(pSender, "unexpected start time in msg. sender startTime:%" PRId64 ", msg startTime:%" PRId64, + pSender->startTime, pMsg->startTime); + goto _ERROR; + } + + ASSERT(pMsg->startTime == pSender->startTime); + // state, term, seq/ack if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader"); @@ -1039,20 +1066,12 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) { goto _ERROR; } - if (pMsg->startTime != pSender->startTime) { - syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match"); - sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, error:%s 0x%x", pMsg->startTime, - pSender->startTime, tstrerror(pMsg->code), pMsg->code); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - goto _ERROR; - } - SyncTerm currentTerm = raftStoreGetTerm(pSyncNode); if (pMsg->term != currentTerm) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match"); sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, currentTerm); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _ERROR; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 383e4e9d8a..4cc86d51b7 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -440,6 +440,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN, "Invalid klen to encod // sync TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, "Sync signature mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync leader is unreachable") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready to propose")