feat: restore incomplete fsm state with maxVerValid via snapshot replication

This commit is contained in:
Benguang Zhao 2023-09-20 09:54:28 +08:00
parent 0c41fa56dc
commit eb4e2aa58f
21 changed files with 242 additions and 141 deletions

View File

@ -55,8 +55,8 @@ typedef struct SSessionKey {
} SSessionKey;
typedef struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
int64_t minVer;
int64_t maxVer;
} SVersionRange;
static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {

View File

@ -36,8 +36,7 @@ extern "C" {
#define SYNC_DEL_WAL_MS (1000 * 60)
#define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1)
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
#define SNAPSHOT_WAIT_MS 1000 * 30
#define SNAPSHOT_WAIT_MS 1000 * 5
#define SYNC_MAX_RETRY_BACKOFF 5
#define SYNC_LOG_REPL_RETRY_WAIT_MS 100
@ -88,10 +87,9 @@ typedef enum {
} ESyncRole;
typedef enum {
TAOS_SYNC_SNAP_INFO_BRIEF = 0,
TAOS_SYNC_SNAP_INFO_FULL = 1,
TAOS_SYNC_SNAP_INFO_DIFF = 2,
} ESyncSnapInfoTyp;
SYNC_FSM_STATE_NORMAL = 0,
SYNC_FSM_STATE_INCOMPLETE,
} ESyncFsmState;
typedef struct SNodeInfo {
int64_t clusterId;
@ -155,8 +153,9 @@ typedef struct SSnapshotParam {
} SSnapshotParam;
typedef struct SSnapshot {
int32_t typ;
int32_t type;
SSyncTLV* data;
ESyncFsmState state;
SyncIndex lastApplyIndex;
SyncTerm lastApplyTerm;
SyncIndex lastConfigIndex;

View File

@ -557,7 +557,7 @@ int32_t* taosGetErrno();
// #define TSDB_CODE_SYN_TOO_MANY_FWDINFO TAOS_DEF_ERROR_CODE(0, 0x0904) // 2.x
// #define TSDB_CODE_SYN_MISMATCHED_PROTOCOL TAOS_DEF_ERROR_CODE(0, 0x0905) // 2.x
// #define TSDB_CODE_SYN_MISMATCHED_CLUSTERID TAOS_DEF_ERROR_CODE(0, 0x0906) // 2.x
// #define TSDB_CODE_SYN_MISMATCHED_SIGNATURE TAOS_DEF_ERROR_CODE(0, 0x0907) // 2.x
#define TSDB_CODE_SYN_MISMATCHED_SIGNATURE TAOS_DEF_ERROR_CODE(0, 0x0907)
// #define TSDB_CODE_SYN_INVALID_CHECKSUM TAOS_DEF_ERROR_CODE(0, 0x0908) // 2.x
// #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) // 2.x
// #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) // 2.x

View File

@ -1025,6 +1025,11 @@ struct STsdbFilterInfo {
TABLEID tbid;
};
enum {
TSDB_FS_STATE_NORMAL = 0,
TSDB_FS_STATE_INCOMPLETE,
};
#ifdef __cplusplus
}
#endif

View File

@ -38,13 +38,6 @@ typedef struct {
STFileHashEntry **buckets;
} STFileHash;
enum {
TSDB_FS_STATE_NONE = 0,
TSDB_FS_STATE_OPEN,
TSDB_FS_STATE_EDIT,
TSDB_FS_STATE_CLOSE,
};
static const char *gCurrentFname[] = {
[TSDB_FCURRENT] = "current.json",
[TSDB_FCURRENT_C] = "current.c.json",
@ -57,7 +50,7 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
fs[0]->tsdb = pTsdb;
tsem_init(&fs[0]->canEdit, 0, 1);
fs[0]->state = TSDB_FS_STATE_NONE;
fs[0]->fsstate = TSDB_FS_STATE_NORMAL;
fs[0]->neid = 0;
TARRAY2_INIT(fs[0]->fSetArr);
TARRAY2_INIT(fs[0]->fSetArrTmp);
@ -496,6 +489,7 @@ static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
int32_t code = 0;
int32_t lino = 0;
int32_t corrupt = false;
{ // scan each file
STFileSet *fset = NULL;
@ -503,8 +497,12 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
// data file
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
if (fset->farr[ftype] == NULL) continue;
code = tsdbFSDoScanAndFixFile(fs, fset->farr[ftype]);
TSDB_CHECK_CODE(code, lino, _exit);
STFileObj *fobj = fset->farr[ftype];
code = tsdbFSDoScanAndFixFile(fs, fobj);
if (code) {
fset->maxVerValid = TMIN(fset->maxVerValid, fobj->f->minVer - 1);
corrupt = true;
}
}
// stt file
@ -513,12 +511,21 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
STFileObj *fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbFSDoScanAndFixFile(fs, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
if (code) {
fset->maxVerValid = TMIN(fset->maxVerValid, fobj->f->minVer - 1);
corrupt = true;
}
}
}
}
}
if (corrupt) {
tsdbError("vgId:%d, not to clear unreferenced files since some fset corrupted", TD_VID(fs->tsdb->pVnode));
fs->fsstate = TSDB_FS_STATE_INCOMPLETE;
goto _exit;
}
{ // clear unreferenced files
STfsDir *dir = tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path);
if (dir == NULL) {
@ -1009,6 +1016,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pExclu
ever = u->sver - 1;
i++;
}
break;
}
code = tsdbTFileSetFilteredInitDup(fs->tsdb, fset, ever, &fset1, fopArr);
@ -1057,8 +1065,11 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
sver1 = u->sver;
i++;
}
break;
}
if (sver1 > ever1) continue;
tsdbInfo("fsrArr:%p, fid:%d, sver:%" PRId64 ", ever:%" PRId64, fsrArr, fset->fid, sver1, ever1);
code = tsdbTSnapRangeInitRef(fs->tsdb, fset, sver1, ever1, &fsr1);

View File

@ -101,7 +101,7 @@ struct STFSBgTask {
struct STFileSystem {
STsdb *tsdb;
tsem_t canEdit;
int32_t state;
int32_t fsstate;
int64_t neid;
EFEditT etype;
TFileSetArray fSetArr[1];

View File

@ -452,6 +452,7 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
if (fset[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
fset[0]->fid = fid;
fset[0]->maxVerValid = VERSION_MAX;
TARRAY2_INIT(fset[0]->lvlArr);
return 0;
}

View File

@ -84,7 +84,7 @@ struct SSttLvl {
struct STFileSet {
int32_t fid;
int8_t stat;
int64_t maxVerValid;
STFileObj *farr[TSDB_FTYPE_MAX]; // file array
TSttLvlArray lvlArr[1]; // level array
};

View File

@ -436,8 +436,8 @@ _exit:
taosMemoryFree(reader[0]);
reader[0] = NULL;
} else {
tsdbInfo("vgId:%d %s done, sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode), __func__, sver, ever,
type);
tsdbInfo("vgId:%d tsdb snapshot reader opened. sver:%" PRId64 " ever:%" PRId64 " type:%d", TD_VID(tsdb->pVnode),
sver, ever, type);
}
return code;
}
@ -1103,6 +1103,8 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
TSDB_CHECK_CODE(code, lino, _exit);
}
writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock);
}
tsdbFSEnableBgTask(tsdb->pFS);
@ -1218,14 +1220,28 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP
goto _err;
}
p->fid = fset->fid;
int32_t code = 0;
int32_t typ = 0;
int32_t corrupt = false;
int32_t count = 0;
for (int32_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset->farr[ftype] == NULL) continue;
typ = tsdbFTypeToSRangeTyp(ftype);
ASSERT(typ < TSDB_SNAP_RANGE_TYP_MAX);
STFile* f = fset->farr[ftype]->f;
if (f->maxVer > fset->maxVerValid) {
corrupt = true;
tsdbError("skip incomplete data file: fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64
", ftype: %d",
fset->fid, fset->maxVerValid, f->minVer, f->maxVer, ftype);
continue;
}
count++;
SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer};
TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
ASSERT(code == 0);
}
typ = TSDB_SNAP_RANGE_TYP_STT;
@ -1234,10 +1250,24 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP
STFileObj* fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
STFile* f = fobj->f;
if (f->maxVer > fset->maxVerValid) {
corrupt = true;
tsdbError("skip incomplete stt file.fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64
", ftype: %d",
fset->fid, fset->maxVerValid, f->minVer, f->maxVer, typ);
continue;
}
count++;
SVersionRange vr = {.minVer = f->minVer, .maxVer = f->maxVer};
TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
ASSERT(code == 0);
}
}
if (corrupt && count == 0) {
SVersionRange vr = {.minVer = VERSION_MIN, .maxVer = fset->maxVerValid};
code = TARRAY2_SORT_INSERT(&p->verRanges[typ], vr, tVersionRangeCmprFn);
ASSERT(code == 0);
}
ppSP[0] = p;
return 0;
@ -1272,7 +1302,8 @@ static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) {
break;
}
ASSERT(pItem != NULL);
TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn);
code = TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn);
ASSERT(code == 0);
}
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
@ -1432,18 +1463,22 @@ int32_t tsdbSnapPartListToRangeDiff(STsdbSnapPartList* pList, TSnapRangeArray**
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
int64_t ever = -1;
int64_t maxVerValid = -1;
int32_t typMax = TSDB_SNAP_RANGE_TYP_MAX;
for (int32_t i = 0; i < typMax; i++) {
SVerRangeList* iList = &part->verRanges[i];
SVersionRange r = {0};
TARRAY2_FOREACH(iList, r) {
if (r.maxVer < r.minVer) continue;
ever = TMAX(ever, r.maxVer);
SVersionRange vr = {0};
TARRAY2_FOREACH(iList, vr) {
if (vr.maxVer < vr.minVer) {
continue;
}
maxVerValid = TMAX(maxVerValid, vr.maxVer);
}
}
r->sver = ever + 1;
r->fid = part->fid;
r->sver = maxVerValid + 1;
r->ever = VERSION_MAX;
tsdbInfo("range diff fid:%" PRId64 ", sver:%" PRId64 ", ever:%" PRId64, part->fid, r->sver, r->ever);
TARRAY2_APPEND(pDiff, r);
}
ppRanges[0] = pDiff;
@ -1473,14 +1508,16 @@ void tsdbSnapPartListDestroy(STsdbSnapPartList** ppList) {
}
int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap) {
if (pSnap->typ != TDMT_SYNC_PREP_SNAPSHOT && pSnap->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
pSnap->state = pTsdb->pFS->fsstate;
if (pSnap->type != TDMT_SYNC_PREP_SNAPSHOT && pSnap->type != TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
return 0;
}
int code = -1;
STsdbSnapPartList* pList = tsdbGetSnapPartList(pTsdb->pFS);
if (pList == NULL) goto _out;
if (pSnap->typ == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
}
void* buf = NULL;
@ -1499,7 +1536,7 @@ int32_t tsdbSnapGetInfo(STsdb* pTsdb, SSnapshot* pSnap) {
// header
SSyncTLV* datHead = (void*)pSnap->data;
datHead->typ = pSnap->typ;
datHead->typ = pSnap->type;
datHead->len = 0;
// tsdb

View File

@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync.h"
#include "tsdb.h"
#include "vnd.h"
#include "vndCos.h"
@ -517,10 +519,3 @@ ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); }
void vnodeStop(SVnode *pVnode) {}
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
pSnap->lastApplyIndex = pVnode->state.committed;
pSnap->lastApplyTerm = pVnode->state.commitTerm;
pSnap->lastConfigIndex = -1;
return tsdbSnapGetInfo(pVnode->pTsdb, pSnap);
}

View File

@ -537,7 +537,9 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeCommitInfo(dir);
code = vnodeCommitInfo(dir);
if (code) goto _exit;
} else {
vnodeRollback(pWriter->pVnode);
}

View File

@ -15,6 +15,8 @@
#define _DEFAULT_SOURCE
#include "tq.h"
#include "sync.h"
#include "tsdb.h"
#include "vnd.h"
#define BATCH_ENABLE 0
@ -783,3 +785,14 @@ bool vnodeIsLeader(SVnode *pVnode) {
return true;
}
int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
pSnap->lastApplyIndex = pVnode->state.committed;
pSnap->lastApplyTerm = pVnode->state.commitTerm;
pSnap->lastConfigIndex = -1;
int32_t code = tsdbSnapGetInfo(pVnode->pTsdb, pSnap);
pSnap->state = (pSnap->state == TSDB_FS_STATE_INCOMPLETE) ? SYNC_FSM_STATE_INCOMPLETE : SYNC_FSM_STATE_NORMAL;
return code;
}

View File

@ -139,6 +139,7 @@ typedef struct SSyncNode {
SSyncFSM* pFsm;
int32_t quorum;
SRaftId leaderCache;
ESyncFsmState fsmState;
// life cycle
int64_t rid;

View File

@ -116,7 +116,7 @@ typedef struct SyncAppendEntriesReply {
SyncIndex matchIndex;
SyncIndex lastSendIndex;
int64_t startTime;
int16_t reserved;
int16_t fsmState;
} SyncAppendEntriesReply;
typedef struct SyncHeartbeat {

View File

@ -31,7 +31,7 @@ extern "C" {
#define SYNC_SNAPSHOT_RETRY_MS 5000
typedef struct SSyncSnapshotSender {
bool start;
int8_t start;
int32_t seq;
int32_t ack;
void *pReader;
@ -60,8 +60,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finis
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
// update when pre snapshot
bool start;
// update when prep snapshot
int8_t start;
int32_t ack;
SyncTerm term;
SRaftId fromId;

View File

@ -155,6 +155,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex,
pEntry->term);
if (ths->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
pReply->fsmState = ths->fsmState;
sError("vgId:%d, not allow to accept sync log msg due to incomplete fsm state", ths->vgId);
syncEntryDestroy(pEntry);
goto _SEND_RESPONSE;
}
// accept
if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
goto _SEND_RESPONSE;
@ -175,7 +182,7 @@ _SEND_RESPONSE:
(void)syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp);
// commit index, i.e. leader notice me
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr());
}

View File

@ -71,6 +71,11 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
}
int32_t syncNodeElect(SSyncNode* pSyncNode) {
if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) {
sNError(pSyncNode, "ignore leader hb timeout due to incomplete fsm state");
return -1;
}
sNInfo(pSyncNode, "begin election");
pSyncNode->electNum++;

View File

@ -1009,6 +1009,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
commitIndex = snapshot.lastApplyIndex;
sNTrace(pSyncNode, "reset commit index by snapshot");
}
pSyncNode->fsmState = snapshot.state;
if (pSyncNode->fsmState) {
sError("vgId:%d, fsm state incomplete.", pSyncNode->vgId);
}
}
pSyncNode->commitIndex = commitIndex;
sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
@ -1163,7 +1167,8 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
return -1;
}
@ -1455,10 +1460,9 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg
}
if (code < 0) {
sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
DID(destRaftId), destRaftId->addr, terrno);
sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, terrstr(), epSet,
DID(destRaftId), destRaftId->addr);
rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
return code;
@ -2895,7 +2899,7 @@ _out:;
// single replica
(void)syncNodeUpdateCommitIndex(ths, matchIndex);
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
code = -1;
}
@ -3139,7 +3143,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
if (pMsg->currentTerm == matchTerm) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
}
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(),
ths->commitIndex);
}

View File

@ -839,14 +839,16 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
return 0;
}
if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) {
sWarn("vgId:%d, failed to rollback match index. peer: dnode:%d, match index:%" PRId64 ", last sent:%" PRId64,
pNode->vgId, DID(&destId), pMsg->matchIndex, pMsg->lastSendIndex);
if (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE || (!pMsg->success && pMsg->matchIndex >= pMsg->lastSendIndex)) {
char* msg1 = "rollback match index failure";
char* msg2 = "incomplete fsm state";
sInfo("vgId:%d, snapshot replication to dnode:%d. reason:%s, match index:%" PRId64 ", last sent:%" PRId64,
pNode->vgId, DID(&destId), (pMsg->fsmState == SYNC_FSM_STATE_INCOMPLETE ? msg2 : msg1), pMsg->matchIndex,
pMsg->lastSendIndex);
if (syncNodeStartSnapshot(pNode, &destId) < 0) {
sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId));
return -1;
}
sInfo("vgId:%d, snapshot replication to peer dnode:%d", pNode->vgId, DID(&destId));
return 0;
}
}
@ -1000,10 +1002,9 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
pMgr->endIndex = index + 1;
if (barrier) {
sInfo("vgId:%d, replicated sync barrier to dest:%" PRIx64 ". index:%" PRId64 ", term:%" PRId64
sInfo("vgId:%d, replicated sync barrier to dnode:%d. index:%" PRId64 ", term:%" PRId64
", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex,
pMgr->endIndex);
pNode->vgId, DID(pDestId), index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
break;
}
}

View File

@ -44,8 +44,8 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex;
pSender->term = raftStoreGetTerm(pSyncNode);
pSender->startTime = 0;
pSender->endTime = 0;
pSender->startTime = -1;
pSender->endTime = -1;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
pSender->finish = false;
@ -71,11 +71,16 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
taosMemoryFree(pSender);
}
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); }
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
int32_t code = -1;
pSender->start = true;
int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true);
if (started) return 0;
taosMsleep(1);
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
pSender->pReader = NULL;
@ -92,13 +97,13 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig));
pSender->sendingMS = 0;
pSender->term = raftStoreGetTerm(pSender->pSyncNode);
pSender->startTime = taosGetTimestampMs();
pSender->startTime = taosGetMonoTimestampMs();
pSender->lastSendTime = pSender->startTime;
pSender->finish = false;
// Get full snapshot info
SSyncNode *pSyncNode = pSender->pSyncNode;
SSnapshot snapInfo = {.typ = TDMT_SYNC_PREP_SNAPSHOT};
SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT};
if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) {
sSError(pSender, "snapshot get info failure since %s", terrstr());
goto _out;
@ -130,11 +135,11 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->startTime = atomic_load_64(&pSender->startTime);
pMsg->seq = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
if (dataLen > 0) {
pMsg->payloadType = snapInfo.typ;
pMsg->payloadType = snapInfo.type;
memcpy(pMsg->data, snapInfo.data, dataLen);
}
@ -160,7 +165,9 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
// update flag
pSender->start = false;
int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false);
if (stopped) return;
pSender->finish = finish;
pSender->endTime = taosGetTimestampMs();
@ -223,6 +230,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->startTime = pSender->startTime;
pMsg->seq = pSender->seq;
if (pSender->pCurrentBlock != NULL) {
@ -286,7 +294,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
if (pMsg->ack != pSender->seq) {
sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
}
@ -301,8 +309,6 @@ static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSn
// return 1, last snapshot finish ok
// return -1, error
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
sNInfo(pSyncNode, "snapshot sender starting ...");
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
if (pSender == NULL) {
sNError(pSyncNode, "snapshot sender start error since get failed");
@ -310,12 +316,12 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
}
if (snapshotSenderIsStart(pSender)) {
sSInfo(pSender, "snapshot sender already start, ignore");
sSDebug(pSender, "snapshot sender already start, ignore");
return 0;
}
if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
sSInfo(pSender, "snapshot sender start too frequently, ignore");
sSDebug(pSender, "snapshot sender start too frequently, ignore");
return 0;
}
@ -342,6 +348,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
}
pReceiver->start = false;
pReceiver->startTime = 0;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->pWriter = NULL;
pReceiver->pSyncNode = pSyncNode;
@ -384,7 +391,7 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
}
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
return (pReceiver != NULL ? pReceiver->start : false);
return (pReceiver != NULL ? atomic_load_8(&pReceiver->start) : false);
}
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
@ -423,11 +430,14 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
return;
}
pReceiver->start = true;
int8_t started = atomic_val_compare_exchange_8(&pReceiver->start, false, true);
if (started) return;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT;
pReceiver->term = raftStoreGetTerm(pReceiver->pSyncNode);
pReceiver->fromId = pPreMsg->srcId;
pReceiver->startTime = pPreMsg->startTime;
ASSERT(pReceiver->startTime);
// event log
sRInfo(pReceiver, "snapshot receiver is start");
@ -438,6 +448,9 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false);
if (stopped) return;
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
@ -448,8 +461,6 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
} else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null");
}
pReceiver->start = false;
}
// when recv last snapshot block, apply data into snapshot
@ -499,6 +510,10 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
// update progress
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
SSnapshot snapshot = {0};
pReceiver->pSyncNode->pFsm->FpGetSnapshotInfo(pReceiver->pSyncNode->pFsm, &snapshot);
pReceiver->pSyncNode->fsmState = snapshot.state;
} else {
sRError(pReceiver, "snapshot receiver finish error since writer is null");
return -1;
@ -582,7 +597,7 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM
// ignore
sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
pReceiver->startTime, pMsg->startTime);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = terrno;
goto _SEND_REPLY;
}
@ -593,33 +608,18 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM
}
_START_RECEIVER:
if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
pMsg->startTime);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
code = terrno;
} else {
// waiting for clock match
while (timeNow < pMsg->startTime) {
sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", startTime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
timeNow = taosGetTimestampMs();
}
if (snapshotReceiverIsStart(pReceiver)) {
sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
snapshotReceiverStop(pReceiver);
}
snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender
if (snapshotReceiverIsStart(pReceiver)) {
sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
snapshotReceiverStop(pReceiver);
}
snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender
_SEND_REPLY:
// build msg
; // make complier happy
SSnapshot snapInfo = {.typ = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY};
int32_t dataLen = 0;
if (pMsg->dataLen > 0) {
void *data = taosMemoryCalloc(1, pMsg->dataLen);
@ -655,13 +655,15 @@ _SEND_REPLY:
pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->startTime = pMsg->startTime;
pRspMsg->ack = pMsg->seq; // receiver maybe already closed
pRspMsg->code = code;
pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
ASSERT(pRspMsg->startTime);
if (snapInfo.data) {
pRspMsg->payloadType = snapInfo.typ;
pRspMsg->payloadType = snapInfo.type;
memcpy(pRspMsg->data, snapInfo.data, dataLen);
// save snapshot info
@ -704,6 +706,7 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
if (pReceiver->startTime != pMsg->startTime) {
sRError(pReceiver, "snapshot receiver begin failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _SEND_REPLY;
}
@ -732,11 +735,13 @@ _SEND_REPLY:
pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->startTime = pMsg->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
ASSERT(pRspMsg->startTime);
// send msg
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
@ -751,17 +756,17 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
// condition 4
// transfering
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
// waiting for clock match
int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) {
sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
timeNow = taosGetTimestampMs();
int32_t code = 0;
if (pReceiver->startTime != pMsg->startTime) {
sRError(pReceiver, "snapshot receive failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = terrno;
goto _SEND_REPLY;
}
int32_t code = 0;
if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
code = terrno;
if (code >= SYNC_SNAPSHOT_SEQ_INVALID) {
@ -769,6 +774,7 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
}
}
_SEND_REPLY:
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) {
@ -782,11 +788,12 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend
pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->startTime = pMsg->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
ASSERT(pRspMsg->startTime);
// send msg
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver received");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
@ -801,21 +808,23 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// condition 2
// end, finish FSM
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
// waiting for clock match
int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) {
sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
timeNow = taosGetTimestampMs();
int32_t code = 0;
if (pReceiver->startTime != pMsg->startTime) {
sRError(pReceiver, "snapshot end failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
code = terrno;
goto _SEND_REPLY;
}
int32_t code = snapshotReceiverFinish(pReceiver, pMsg);
code = snapshotReceiverFinish(pReceiver, pMsg);
if (code == 0) {
snapshotReceiverStop(pReceiver);
}
_SEND_REPLY:
// build msg
SRpcMsg rpcMsg = {0};
if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId) != 0) {
@ -829,7 +838,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
pRspMsg->term = raftStoreGetTerm(pSyncNode);
pRspMsg->lastIndex = pMsg->lastIndex;
pRspMsg->lastTerm = pMsg->lastTerm;
pRspMsg->startTime = pReceiver->startTime;
pRspMsg->startTime = pMsg->startTime;
pRspMsg->ack = pReceiver->ack; // receiver maybe already closed
pRspMsg->code = code;
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
@ -945,13 +954,6 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
sSError(pSender, "prepare snapshot failed since beginIndex:%" PRId64 " larger than applyIndex:%" PRId64,
pMsg->snapBeginIndex, snapshot.lastApplyIndex);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
// update sender
pSender->snapshot = snapshot;
@ -964,6 +966,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
return -1;
}
pSender->snapshotParam.data = (void *)pMsg->data;
sSInfo(pSender, "data of snapshot param. len: %d", datHead->len);
}
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);
@ -997,6 +1000,11 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
pSendMsg->startTime = pSender->startTime;
pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
ASSERT(pSendMsg->startTime);
sSInfo(pSender, "begin snapshot replication to dnode %d. startTime:%" PRId64, DID(&pSendMsg->destId),
pSendMsg->startTime);
// send msg
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
@ -1019,7 +1027,7 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
return -1;
}
@ -1031,6 +1039,25 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return -1;
}
if (!snapshotSenderIsStart(pSender)) {
sSError(pSender, "snapshot sender not started yet. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
pSender->startTime, pMsg->startTime);
return -1;
}
if (pMsg->startTime < pSender->startTime) {
sSError(pSender, "ignore stale rsp received. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
pSender->startTime, pMsg->startTime);
terrno = pMsg->code;
return -1;
} else if (pMsg->startTime > pSender->startTime) {
sSError(pSender, "unexpected start time in msg. sender startTime:%" PRId64 ", msg startTime:%" PRId64,
pSender->startTime, pMsg->startTime);
goto _ERROR;
}
ASSERT(pMsg->startTime == pSender->startTime);
// state, term, seq/ack
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender not leader");
@ -1039,20 +1066,12 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
goto _ERROR;
}
if (pMsg->startTime != pSender->startTime) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver time not match");
sSError(pSender, "sender:%" PRId64 " receiver:%" PRId64 " time not match, error:%s 0x%x", pMsg->startTime,
pSender->startTime, tstrerror(pMsg->code), pMsg->code);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
goto _ERROR;
}
SyncTerm currentTerm = raftStoreGetTerm(pSyncNode);
if (pMsg->term != currentTerm) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender and receiver term not match");
sSError(pSender, "snapshot sender term not equal, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term,
currentTerm);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
goto _ERROR;
}

View File

@ -440,6 +440,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN, "Invalid klen to encod
// sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, "Sync signature mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync leader is unreachable")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready to propose")