Merge remote-tracking branch 'origin/feature/sync-mnode-integration' into fix/mnode
This commit is contained in:
commit
3ff366cdaf
|
@ -78,6 +78,8 @@ typedef struct SFsmCbMeta {
|
|||
int32_t code;
|
||||
ESyncState state;
|
||||
uint64_t seqNum;
|
||||
SyncTerm term;
|
||||
SyncTerm currentTerm;
|
||||
} SFsmCbMeta;
|
||||
|
||||
typedef struct SSyncFSM {
|
||||
|
@ -85,6 +87,7 @@ typedef struct SSyncFSM {
|
|||
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
void (*FpRestoreFinish)(struct SSyncFSM* pFsm);
|
||||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
|
||||
} SSyncFSM;
|
||||
|
@ -117,7 +120,6 @@ typedef struct SSyncLogStore {
|
|||
|
||||
} SSyncLogStore;
|
||||
|
||||
|
||||
typedef struct SSyncInfo {
|
||||
SyncGroupId vgId;
|
||||
SSyncCfg syncCfg;
|
||||
|
|
|
@ -27,43 +27,37 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
|||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
SSdbRaw *pRaw = pMsg->pCont;
|
||||
|
||||
SSnapshot snapshot = {0};
|
||||
(*pFsm->FpGetSnapshot)(pFsm, &snapshot);
|
||||
|
||||
if (cbMeta.index > snapshot.lastApplyIndex) {
|
||||
mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state));
|
||||
sdbWriteWithoutFree(pSdb, pRaw);
|
||||
sdbSetApplyIndex(pSdb, cbMeta.index);
|
||||
sdbSetApplyTerm(pSdb, cbMeta.term);
|
||||
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
}
|
||||
} else {
|
||||
mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, snapshot.lastApplyIndex);
|
||||
}
|
||||
}
|
||||
|
||||
static void mndSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
// strict consistent, do nothing
|
||||
}
|
||||
|
||||
static void mndSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
// strict consistent, do nothing
|
||||
}
|
||||
|
||||
static int32_t mndSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
|
||||
pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mndRestoreFinish(struct SSyncFSM *pFsm) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
mndTransPullup(pMnode);
|
||||
pMnode->syncMgmt.restored = true;
|
||||
}
|
||||
|
||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||
pFsm->data = pMnode;
|
||||
pFsm->FpCommitCb = mndSyncCommitMsg;
|
||||
pFsm->FpPreCommitCb = mndSyncPreCommitMsg;
|
||||
pFsm->FpRollBackCb = mndSyncRollBackMsg;
|
||||
pFsm->FpPreCommitCb = NULL;
|
||||
pFsm->FpRollBackCb = NULL;
|
||||
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
|
||||
pFsm->FpRestoreFinish = mndRestoreFinish;
|
||||
pFsm->FpRestoreSnapshot = NULL;
|
||||
return pFsm;
|
||||
}
|
||||
|
||||
|
@ -152,21 +146,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
|||
}
|
||||
|
||||
void mndSyncStart(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int64_t lastApplyIndex = sdbGetApplyIndex(pSdb);
|
||||
|
||||
syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb);
|
||||
syncStart(pMnode->syncMgmt.sync);
|
||||
|
||||
int64_t applyIndex = sdbGetApplyIndex(pSdb);
|
||||
mndTransPullup(pMnode);
|
||||
mDebug("pullup trans finished, applyIndex:%" PRId64, applyIndex);
|
||||
if (applyIndex != lastApplyIndex) {
|
||||
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastApplyIndex, applyIndex);
|
||||
sdbWriteFile(pSdb);
|
||||
}
|
||||
|
||||
pMnode->syncMgmt.restored = true;
|
||||
mDebug("sync:%" PRId64 " is started", pMnode->syncMgmt.sync);
|
||||
}
|
||||
|
||||
|
|
|
@ -36,8 +36,8 @@ typedef struct SSyncIO {
|
|||
STaosQueue *pMsgQ;
|
||||
STaosQset * pQset;
|
||||
TdThread consumerTid;
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
void * serverRpc;
|
||||
void * clientRpc;
|
||||
SEpSet myAddr;
|
||||
SMsgCb msgcb;
|
||||
|
||||
|
|
|
@ -147,6 +147,11 @@ typedef struct SSyncNode {
|
|||
// tools
|
||||
SSyncRespMgr* pSyncRespMgr;
|
||||
|
||||
// restore state
|
||||
bool restoreFinish;
|
||||
sem_t restoreSem;
|
||||
SSnapshot* pSnapshot;
|
||||
|
||||
} SSyncNode;
|
||||
|
||||
// open/close --------------
|
||||
|
|
|
@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
// if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
|
@ -332,7 +331,18 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
cbMeta.code = 0;
|
||||
cbMeta.state = ths->state;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
cbMeta.term = pEntry->term;
|
||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
|
||||
bool needExecute = true;
|
||||
if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) {
|
||||
needExecute = false;
|
||||
}
|
||||
|
||||
if (needExecute) {
|
||||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||
}
|
||||
}
|
||||
|
||||
// config change
|
||||
|
@ -349,6 +359,18 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
// restore finish
|
||||
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||
if (ths->restoreFinish == false) {
|
||||
if (ths->pFsm->FpRestoreFinish != NULL) {
|
||||
ths->pFsm->FpRestoreFinish(ths->pFsm);
|
||||
}
|
||||
ths->restoreFinish = true;
|
||||
|
||||
tsem_post(&ths->restoreSem);
|
||||
}
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
|
|
@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
// if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||
SFsmCbMeta cbMeta;
|
||||
cbMeta.index = pEntry->index;
|
||||
|
@ -110,8 +109,18 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
cbMeta.code = 0;
|
||||
cbMeta.state = pSyncNode->state;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
cbMeta.term = pEntry->term;
|
||||
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
|
||||
|
||||
bool needExecute = true;
|
||||
if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) {
|
||||
needExecute = false;
|
||||
}
|
||||
|
||||
if (needExecute) {
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
|
||||
}
|
||||
}
|
||||
|
||||
// config change
|
||||
if (pEntry->originalRpcType == TDMT_VND_SYNC_CONFIG_CHANGE) {
|
||||
|
@ -127,6 +136,18 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
}
|
||||
}
|
||||
|
||||
// restore finish
|
||||
if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) {
|
||||
if (pSyncNode->restoreFinish == false) {
|
||||
if (pSyncNode->pFsm->FpRestoreFinish != NULL) {
|
||||
pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm);
|
||||
}
|
||||
pSyncNode->restoreFinish = true;
|
||||
|
||||
tsem_post(&pSyncNode->restoreSem);
|
||||
}
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
|
|
|
@ -242,7 +242,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) {
|
||||
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
|
||||
|
@ -492,6 +492,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
|||
pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
|
||||
assert(pSyncNode->pSyncRespMgr != NULL);
|
||||
|
||||
// restore state
|
||||
pSyncNode->restoreFinish = false;
|
||||
pSyncNode->pSnapshot = NULL;
|
||||
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
||||
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
|
||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
|
||||
}
|
||||
tsem_init(&(pSyncNode->restoreSem), 0, 0);
|
||||
|
||||
// start in syncNodeStart
|
||||
// start raft
|
||||
// syncNodeBecomeFollower(pSyncNode);
|
||||
|
@ -511,6 +520,8 @@ void syncNodeStart(SSyncNode* pSyncNode) {
|
|||
// use this now
|
||||
syncNodeAppendNoop(pSyncNode);
|
||||
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
|
||||
|
||||
tsem_wait(&pSyncNode->restoreSem);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -520,6 +531,8 @@ void syncNodeStart(SSyncNode* pSyncNode) {
|
|||
int32_t ret = 0;
|
||||
// ret = syncNodeStartPingTimer(pSyncNode);
|
||||
assert(ret == 0);
|
||||
|
||||
tsem_wait(&pSyncNode->restoreSem);
|
||||
}
|
||||
|
||||
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||
|
@ -556,6 +569,12 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
|||
taosMemoryFree(pSyncNode->pFsm);
|
||||
}
|
||||
|
||||
if (pSyncNode->pSnapshot != NULL) {
|
||||
taosMemoryFree(pSyncNode->pSnapshot);
|
||||
}
|
||||
|
||||
tsem_destroy(&pSyncNode->restoreSem);
|
||||
|
||||
// free memory in syncFreeNode
|
||||
// taosMemoryFree(pSyncNode);
|
||||
}
|
||||
|
|
|
@ -73,12 +73,17 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void FpRestoreFinishCb(struct SSyncFSM* pFsm) {
|
||||
sTrace("==callback== ==FpRestoreFinishCb==");
|
||||
}
|
||||
|
||||
SSyncFSM* createFsm() {
|
||||
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
||||
pFsm->FpCommitCb = CommitCb;
|
||||
pFsm->FpPreCommitCb = PreCommitCb;
|
||||
pFsm->FpRollBackCb = RollBackCb;
|
||||
pFsm->FpGetSnapshot = GetSnapshotCb;
|
||||
pFsm->FpRestoreFinish = FpRestoreFinishCb;
|
||||
return pFsm;
|
||||
}
|
||||
|
||||
|
|
|
@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
sprintf(tsTempDir, "%s", ".");
|
||||
|
||||
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = 143 + 64;
|
||||
|
|
Loading…
Reference in New Issue