refactor(sync): add last config index
This commit is contained in:
parent
972fee7fbc
commit
6586f78599
|
@ -236,6 +236,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
|
||||||
|
|
||||||
#if defined(TD_MSG_NUMBER_)
|
#if defined(TD_MSG_NUMBER_)
|
||||||
TDMT_MAX
|
TDMT_MAX
|
||||||
|
|
|
@ -48,6 +48,7 @@ typedef enum {
|
||||||
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
||||||
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
|
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
|
||||||
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
|
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
|
||||||
|
TAOS_SYNC_ONLY_ONE_REPLICA = 3,
|
||||||
} ESyncProposeCode;
|
} ESyncProposeCode;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -200,6 +201,9 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
|
||||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
|
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
|
||||||
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
|
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
|
||||||
|
|
||||||
|
int32_t syncLeaderTransfer(int64_t rid);
|
||||||
|
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
|
||||||
|
|
||||||
// to be moved to static
|
// to be moved to static
|
||||||
void syncStartNormal(int64_t rid);
|
void syncStartNormal(int64_t rid);
|
||||||
void syncStartStandBy(int64_t rid);
|
void syncStartStandBy(int64_t rid);
|
||||||
|
|
|
@ -456,6 +456,36 @@ void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg);
|
||||||
void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg);
|
void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg);
|
||||||
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg);
|
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg);
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
typedef struct SyncLeaderTransfer {
|
||||||
|
uint32_t bytes;
|
||||||
|
int32_t vgId;
|
||||||
|
uint32_t msgType;
|
||||||
|
/*
|
||||||
|
SRaftId srcId;
|
||||||
|
SRaftId destId;
|
||||||
|
*/
|
||||||
|
SRaftId newLeaderId;
|
||||||
|
} SyncLeaderTransfer;
|
||||||
|
|
||||||
|
SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId);
|
||||||
|
void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg);
|
||||||
|
void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen);
|
||||||
|
void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg);
|
||||||
|
char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len);
|
||||||
|
SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len);
|
||||||
|
void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg);
|
||||||
|
SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg);
|
||||||
|
char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncLeaderTransferPrint(const SyncLeaderTransfer* pMsg);
|
||||||
|
void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg);
|
||||||
|
void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg);
|
||||||
|
void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg);
|
||||||
|
|
||||||
// on message ----------------------
|
// on message ----------------------
|
||||||
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
||||||
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||||
|
|
|
@ -402,7 +402,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
char logBuf[512] = {0};
|
char logBuf[512] = {0};
|
||||||
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
||||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||||
|
static int64_t mndTick = 0;
|
||||||
|
if (++mndTick % 1000 == 1) {
|
||||||
|
mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||||
|
}
|
||||||
syncRpcMsgLog2(logBuf, pMsg);
|
syncRpcMsgLog2(logBuf, pMsg);
|
||||||
taosMemoryFree(syncNodeStr);
|
taosMemoryFree(syncNodeStr);
|
||||||
|
|
||||||
|
|
|
@ -159,7 +159,7 @@ typedef struct SSyncNode {
|
||||||
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
|
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
|
||||||
SSyncSnapshotReceiver* pNewNodeReceiver;
|
SSyncSnapshotReceiver* pNewNodeReceiver;
|
||||||
|
|
||||||
SSnapshotMeta sMeta;
|
// SSnapshotMeta sMeta;
|
||||||
|
|
||||||
} SSyncNode;
|
} SSyncNode;
|
||||||
|
|
||||||
|
@ -194,7 +194,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
|
||||||
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
|
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
|
||||||
char* syncNode2Str(const SSyncNode* pSyncNode);
|
char* syncNode2Str(const SSyncNode* pSyncNode);
|
||||||
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
|
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
|
||||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop);
|
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop);
|
||||||
|
|
||||||
SSyncNode* syncNodeAcquire(int64_t rid);
|
SSyncNode* syncNodeAcquire(int64_t rid);
|
||||||
void syncNodeRelease(SSyncNode* pNode);
|
void syncNodeRelease(SSyncNode* pNode);
|
||||||
|
|
|
@ -35,6 +35,7 @@ typedef struct SRaftCfg {
|
||||||
char path[TSDB_FILENAME_LEN * 2];
|
char path[TSDB_FILENAME_LEN * 2];
|
||||||
int8_t isStandBy;
|
int8_t isStandBy;
|
||||||
int8_t snapshotEnable;
|
int8_t snapshotEnable;
|
||||||
|
SyncIndex lastConfigIndex;
|
||||||
} SRaftCfg;
|
} SRaftCfg;
|
||||||
|
|
||||||
SRaftCfg *raftCfgOpen(const char *path);
|
SRaftCfg *raftCfgOpen(const char *path);
|
||||||
|
@ -54,6 +55,7 @@ int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
|
||||||
typedef struct SRaftCfgMeta {
|
typedef struct SRaftCfgMeta {
|
||||||
int8_t isStandBy;
|
int8_t isStandBy;
|
||||||
int8_t snapshotEnable;
|
int8_t snapshotEnable;
|
||||||
|
SyncIndex lastConfigIndex;
|
||||||
} SRaftCfgMeta;
|
} SRaftCfgMeta;
|
||||||
|
|
||||||
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path);
|
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path);
|
||||||
|
|
|
@ -88,6 +88,246 @@
|
||||||
// /\ UNCHANGED <<serverVars, commitIndex, messages>>
|
// /\ UNCHANGED <<serverVars, commitIndex, messages>>
|
||||||
// /\ UNCHANGED <<candidateVars, leaderVars>>
|
// /\ UNCHANGED <<candidateVars, leaderVars>>
|
||||||
//
|
//
|
||||||
|
|
||||||
|
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
char logBuf[128] = {0};
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesCb== term:%lu", ths->pRaftStore->currentTerm);
|
||||||
|
syncAppendEntriesLog2(logBuf, pMsg);
|
||||||
|
|
||||||
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
|
syncNodeUpdateTerm(ths, pMsg->term);
|
||||||
|
}
|
||||||
|
assert(pMsg->term <= ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
|
// reset elect timer
|
||||||
|
if (pMsg->term == ths->pRaftStore->currentTerm) {
|
||||||
|
ths->leaderCache = pMsg->srcId;
|
||||||
|
syncNodeResetElectTimer(ths);
|
||||||
|
}
|
||||||
|
assert(pMsg->dataLen >= 0);
|
||||||
|
|
||||||
|
SyncTerm localPreLogTerm = 0;
|
||||||
|
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||||
|
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
|
||||||
|
assert(pEntry != NULL);
|
||||||
|
localPreLogTerm = pEntry->term;
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool logOK =
|
||||||
|
(pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
|
||||||
|
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
|
||||||
|
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) && (pMsg->prevLogTerm == localPreLogTerm));
|
||||||
|
|
||||||
|
// reject request
|
||||||
|
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
|
||||||
|
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
|
||||||
|
sTrace(
|
||||||
|
"syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
|
||||||
|
"logOK:%d",
|
||||||
|
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
|
||||||
|
|
||||||
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
pReply->srcId = ths->myRaftId;
|
||||||
|
pReply->destId = pMsg->srcId;
|
||||||
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
pReply->success = false;
|
||||||
|
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
// return to follower state
|
||||||
|
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
|
sTrace(
|
||||||
|
"syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
|
||||||
|
"ths->state:%d, logOK:%d",
|
||||||
|
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
|
||||||
|
|
||||||
|
syncNodeBecomeFollower(ths, "from candidate by append entries");
|
||||||
|
|
||||||
|
// ret or reply?
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
// accept request
|
||||||
|
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
|
||||||
|
// preIndex = -1, or has preIndex entry in local log
|
||||||
|
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
|
||||||
|
|
||||||
|
// has extra entries (> preIndex) in local log
|
||||||
|
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
|
||||||
|
|
||||||
|
// has entries in SyncAppendEntries msg
|
||||||
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
|
||||||
|
sTrace(
|
||||||
|
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
|
||||||
|
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
|
||||||
|
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
|
||||||
|
|
||||||
|
if (hasExtraEntries && hasAppendEntries) {
|
||||||
|
// not conflict by default
|
||||||
|
bool conflict = false;
|
||||||
|
|
||||||
|
SyncIndex extraIndex = pMsg->prevLogIndex + 1;
|
||||||
|
SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
|
||||||
|
assert(pExtraEntry != NULL);
|
||||||
|
|
||||||
|
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||||
|
assert(pAppendEntry != NULL);
|
||||||
|
|
||||||
|
// log not match, conflict
|
||||||
|
assert(extraIndex == pAppendEntry->index);
|
||||||
|
if (pExtraEntry->term != pAppendEntry->term) {
|
||||||
|
conflict = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (conflict) {
|
||||||
|
// roll back
|
||||||
|
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
|
||||||
|
SyncIndex delEnd = extraIndex;
|
||||||
|
|
||||||
|
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
|
||||||
|
|
||||||
|
// notice! reverse roll back!
|
||||||
|
for (SyncIndex index = delEnd; index >= delBegin; --index) {
|
||||||
|
if (ths->pFsm->FpRollBackCb != NULL) {
|
||||||
|
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
|
||||||
|
assert(pRollBackEntry != NULL);
|
||||||
|
|
||||||
|
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
|
||||||
|
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||||
|
|
||||||
|
SFsmCbMeta cbMeta;
|
||||||
|
cbMeta.index = pRollBackEntry->index;
|
||||||
|
cbMeta.isWeak = pRollBackEntry->isWeak;
|
||||||
|
cbMeta.code = 0;
|
||||||
|
cbMeta.state = ths->state;
|
||||||
|
cbMeta.seqNum = pRollBackEntry->seqNum;
|
||||||
|
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
}
|
||||||
|
|
||||||
|
syncEntryDestory(pRollBackEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete confict entries
|
||||||
|
ths->pLogStore->truncate(ths->pLogStore, extraIndex);
|
||||||
|
|
||||||
|
// append new entries
|
||||||
|
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
|
||||||
|
|
||||||
|
// pre commit
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||||
|
if (ths->pFsm != NULL) {
|
||||||
|
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||||
|
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||||
|
SFsmCbMeta cbMeta;
|
||||||
|
cbMeta.index = pAppendEntry->index;
|
||||||
|
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||||
|
cbMeta.code = 2;
|
||||||
|
cbMeta.state = ths->state;
|
||||||
|
cbMeta.seqNum = pAppendEntry->seqNum;
|
||||||
|
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
}
|
||||||
|
|
||||||
|
// free memory
|
||||||
|
syncEntryDestory(pExtraEntry);
|
||||||
|
syncEntryDestory(pAppendEntry);
|
||||||
|
|
||||||
|
} else if (hasExtraEntries && !hasAppendEntries) {
|
||||||
|
// do nothing
|
||||||
|
|
||||||
|
} else if (!hasExtraEntries && hasAppendEntries) {
|
||||||
|
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
|
||||||
|
assert(pAppendEntry != NULL);
|
||||||
|
|
||||||
|
// append new entries
|
||||||
|
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
|
||||||
|
|
||||||
|
// pre commit
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||||
|
if (ths->pFsm != NULL) {
|
||||||
|
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||||
|
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||||
|
SFsmCbMeta cbMeta;
|
||||||
|
cbMeta.index = pAppendEntry->index;
|
||||||
|
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||||
|
cbMeta.code = 3;
|
||||||
|
cbMeta.state = ths->state;
|
||||||
|
cbMeta.seqNum = pAppendEntry->seqNum;
|
||||||
|
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
|
||||||
|
// free memory
|
||||||
|
syncEntryDestory(pAppendEntry);
|
||||||
|
|
||||||
|
} else if (!hasExtraEntries && !hasAppendEntries) {
|
||||||
|
// do nothing
|
||||||
|
|
||||||
|
} else {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
pReply->srcId = ths->myRaftId;
|
||||||
|
pReply->destId = pMsg->srcId;
|
||||||
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
pReply->success = true;
|
||||||
|
|
||||||
|
if (hasAppendEntries) {
|
||||||
|
pReply->matchIndex = pMsg->prevLogIndex + 1;
|
||||||
|
} else {
|
||||||
|
pReply->matchIndex = pMsg->prevLogIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
|
// maybe update commit index from leader
|
||||||
|
if (pMsg->commitIndex > ths->commitIndex) {
|
||||||
|
// has commit entry in local
|
||||||
|
if (pMsg->commitIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
|
||||||
|
SyncIndex beginIndex = ths->commitIndex + 1;
|
||||||
|
SyncIndex endIndex = pMsg->commitIndex;
|
||||||
|
|
||||||
|
// update commit index
|
||||||
|
ths->commitIndex = pMsg->commitIndex;
|
||||||
|
|
||||||
|
// call back Wal
|
||||||
|
ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
|
||||||
|
|
||||||
|
int32_t code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
@ -375,7 +615,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
// I am in newConfig
|
// I am in newConfig
|
||||||
if (hit) {
|
if (hit) {
|
||||||
syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);
|
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
|
||||||
|
|
||||||
// change isStandBy to normal
|
// change isStandBy to normal
|
||||||
if (!isDrop) {
|
if (!isDrop) {
|
||||||
|
@ -437,6 +677,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
|
@ -192,6 +192,40 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncLeaderTransfer(int64_t rid) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
assert(rid == pSyncNode->rid);
|
||||||
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
if (pSyncNode->replicaNum == 1) {
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
sError("only one replica, cannot drop leader");
|
||||||
|
return TAOS_SYNC_ONLY_ONE_REPLICA;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
|
||||||
|
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
|
||||||
|
pMsg->newLeaderId.vgId = pSyncNode->vgId;
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
syncLeaderTransferDestroy(pMsg);
|
||||||
|
|
||||||
|
ret = syncPropose(rid, &rpcMsg, false);
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
|
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
|
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
|
||||||
|
@ -206,6 +240,40 @@ int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool syncCanLeaderTransfer(int64_t rid) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
assert(rid == pSyncNode->rid);
|
||||||
|
|
||||||
|
if (pSyncNode->replicaNum == 1) {
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool matchOK = true;
|
||||||
|
if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE || pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
SyncIndex myCommitIndex = pSyncNode->commitIndex;
|
||||||
|
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
|
SyncIndex peerMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId)[i]);
|
||||||
|
if (peerMatchIndex < myCommitIndex) {
|
||||||
|
matchOK = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return matchOK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncGiveUpLeader(int64_t rid) { return 0; }
|
||||||
|
|
||||||
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
||||||
int32_t ret = syncPropose(rid, pMsg, isWeak);
|
int32_t ret = syncPropose(rid, pMsg, isWeak);
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -241,7 +309,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
assert(rid == pSyncNode->rid);
|
assert(rid == pSyncNode->rid);
|
||||||
*sMeta = pSyncNode->sMeta;
|
sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex;
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -643,7 +711,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
||||||
// syncNodeBecomeFollower(pSyncNode);
|
// syncNodeBecomeFollower(pSyncNode);
|
||||||
|
|
||||||
// snapshot meta
|
// snapshot meta
|
||||||
pSyncNode->sMeta.lastConfigIndex = -1;
|
// pSyncNode->sMeta.lastConfigIndex = -1;
|
||||||
|
|
||||||
return pSyncNode;
|
return pSyncNode;
|
||||||
}
|
}
|
||||||
|
@ -1076,9 +1144,11 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDrop) {
|
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) {
|
||||||
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
|
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
|
||||||
pSyncNode->pRaftCfg->cfg = *newConfig;
|
pSyncNode->pRaftCfg->cfg = *newConfig;
|
||||||
|
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
|
||||||
|
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
// init internal
|
// init internal
|
||||||
|
@ -1735,55 +1805,19 @@ const char* syncStr(ESyncState state) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
|
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
|
||||||
int32_t code = 0;
|
SyncLeaderTransfer* pSyncLeaderTransfer;
|
||||||
ESyncState state = flag;
|
if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) {
|
||||||
sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
|
|
||||||
endIndex, syncUtilState2String(state));
|
|
||||||
|
|
||||||
/*
|
|
||||||
// maybe execute by leader, skip snapshot
|
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
|
||||||
if (ths->pFsm->FpGetSnapshot != NULL) {
|
|
||||||
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
|
|
||||||
}
|
|
||||||
if (beginIndex <= snapshot.lastApplyIndex) {
|
|
||||||
beginIndex = snapshot.lastApplyIndex + 1;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
// execute fsm
|
|
||||||
if (ths->pFsm != NULL) {
|
|
||||||
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
|
||||||
if (i != SYNC_INDEX_INVALID) {
|
|
||||||
SSyncRaftEntry* pEntry;
|
|
||||||
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
ASSERT(pEntry != NULL);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
|
||||||
|
|
||||||
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
|
||||||
SFsmCbMeta cbMeta;
|
|
||||||
cbMeta.index = pEntry->index;
|
|
||||||
cbMeta.isWeak = pEntry->isWeak;
|
|
||||||
cbMeta.code = 0;
|
|
||||||
cbMeta.state = ths->state;
|
|
||||||
cbMeta.seqNum = pEntry->seqNum;
|
|
||||||
cbMeta.term = pEntry->term;
|
|
||||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
|
||||||
cbMeta.flag = flag;
|
|
||||||
|
|
||||||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// config change
|
return 0;
|
||||||
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
|
}
|
||||||
|
|
||||||
|
static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
|
||||||
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
|
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
|
||||||
|
|
||||||
SSyncCfg newSyncCfg;
|
SSyncCfg newSyncCfg;
|
||||||
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg);
|
int32_t ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
// update new config myIndex
|
// update new config myIndex
|
||||||
|
@ -1797,12 +1831,10 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SReConfigCbMeta cbMeta = {0};
|
|
||||||
bool isDrop;
|
bool isDrop;
|
||||||
|
|
||||||
// I am in newConfig
|
if (hit) { // I am in newConfig
|
||||||
if (hit) {
|
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
|
||||||
syncNodeUpdateConfig(ths, &newSyncCfg, &isDrop);
|
|
||||||
|
|
||||||
// change isStandBy to normal
|
// change isStandBy to normal
|
||||||
if (!isDrop) {
|
if (!isDrop) {
|
||||||
|
@ -1824,6 +1856,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
|
|
||||||
// always call FpReConfigCb
|
// always call FpReConfigCb
|
||||||
if (ths->pFsm->FpReConfigCb != NULL) {
|
if (ths->pFsm->FpReConfigCb != NULL) {
|
||||||
|
SReConfigCbMeta cbMeta = {0};
|
||||||
cbMeta.code = 0;
|
cbMeta.code = 0;
|
||||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
|
@ -1833,8 +1866,55 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
cbMeta.seqNum = pEntry->seqNum;
|
cbMeta.seqNum = pEntry->seqNum;
|
||||||
cbMeta.flag = 0x11;
|
cbMeta.flag = 0x11;
|
||||||
cbMeta.isDrop = isDrop;
|
cbMeta.isDrop = isDrop;
|
||||||
ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta);
|
ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
|
||||||
|
int32_t code = 0;
|
||||||
|
ESyncState state = flag;
|
||||||
|
sInfo("sync event vgId:%d commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
|
||||||
|
endIndex, syncUtilState2String(state));
|
||||||
|
|
||||||
|
// execute fsm
|
||||||
|
if (ths->pFsm != NULL) {
|
||||||
|
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
||||||
|
if (i != SYNC_INDEX_INVALID) {
|
||||||
|
SSyncRaftEntry* pEntry;
|
||||||
|
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
ASSERT(pEntry != NULL);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||||
|
|
||||||
|
// user commit
|
||||||
|
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||||
|
SFsmCbMeta cbMeta;
|
||||||
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.isWeak = pEntry->isWeak;
|
||||||
|
cbMeta.code = 0;
|
||||||
|
cbMeta.state = ths->state;
|
||||||
|
cbMeta.seqNum = pEntry->seqNum;
|
||||||
|
cbMeta.term = pEntry->term;
|
||||||
|
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||||
|
cbMeta.flag = flag;
|
||||||
|
|
||||||
|
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||||
|
}
|
||||||
|
|
||||||
|
// config change
|
||||||
|
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||||
|
code = syncNodeConfigChange(ths, &rpcMsg, pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// config change
|
||||||
|
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
|
||||||
|
code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore finish
|
// restore finish
|
||||||
|
|
|
@ -75,6 +75,11 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
|
||||||
pRoot = syncSnapshotRsp2Json(pSyncMsg);
|
pRoot = syncSnapshotRsp2Json(pSyncMsg);
|
||||||
syncSnapshotRspDestroy(pSyncMsg);
|
syncSnapshotRspDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_LEADER_TRANSFER) {
|
||||||
|
SyncLeaderTransfer* pSyncMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
pRoot = syncLeaderTransfer2Json(pSyncMsg);
|
||||||
|
syncLeaderTransferDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) {
|
} else if (pRpcMsg->msgType == TDMT_SYNC_COMMON_RESPONSE) {
|
||||||
pRoot = cJSON_CreateObject();
|
pRoot = cJSON_CreateObject();
|
||||||
char* s;
|
char* s;
|
||||||
|
@ -2056,3 +2061,165 @@ void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) {
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId) {
|
||||||
|
uint32_t bytes = sizeof(SyncLeaderTransfer);
|
||||||
|
SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
memset(pMsg, 0, bytes);
|
||||||
|
pMsg->bytes = bytes;
|
||||||
|
pMsg->vgId = vgId;
|
||||||
|
pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen) {
|
||||||
|
assert(pMsg->bytes <= bufLen);
|
||||||
|
memcpy(buf, pMsg, pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg) {
|
||||||
|
memcpy(pMsg, buf, len);
|
||||||
|
assert(len == pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len) {
|
||||||
|
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||||
|
assert(buf != NULL);
|
||||||
|
syncLeaderTransferSerialize(pMsg, buf, pMsg->bytes);
|
||||||
|
if (len != NULL) {
|
||||||
|
*len = pMsg->bytes;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len) {
|
||||||
|
uint32_t bytes = *((uint32_t*)buf);
|
||||||
|
SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
assert(pMsg != NULL);
|
||||||
|
syncLeaderTransferDeserialize(buf, len, pMsg);
|
||||||
|
assert(len == pMsg->bytes);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
syncLeaderTransferSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg) {
|
||||||
|
syncLeaderTransferDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncLeaderTransfer* pMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
assert(pMsg != NULL);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg) {
|
||||||
|
char u64buf[128];
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
|
/*
|
||||||
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||||
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
cJSON* pTmp = pSrcId;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||||
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
cJSON* pTmp = pDestId;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
*/
|
||||||
|
|
||||||
|
cJSON* pNewerId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->newLeaderId.addr);
|
||||||
|
cJSON_AddStringToObject(pNewerId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->newLeaderId.addr;
|
||||||
|
cJSON* pTmp = pNewerId;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pNewerId, "vgId", pMsg->newLeaderId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "newLeaderId", pNewerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncLeaderTransfer", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg) {
|
||||||
|
cJSON* pJson = syncLeaderTransfer2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncLeaderTransferPrint(const SyncLeaderTransfer* pMsg) {
|
||||||
|
char* serialized = syncLeaderTransfer2Str(pMsg);
|
||||||
|
printf("syncLeaderTransferPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg) {
|
||||||
|
char* serialized = syncLeaderTransfer2Str(pMsg);
|
||||||
|
printf("syncLeaderTransferPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg) {
|
||||||
|
char* serialized = syncLeaderTransfer2Str(pMsg);
|
||||||
|
sTrace("syncLeaderTransferLog | len:%lu | %s", strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = syncLeaderTransfer2Str(pMsg);
|
||||||
|
sTrace("syncLeaderTransferLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
|
@ -150,6 +150,10 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
||||||
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
|
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
|
||||||
cJSON_AddNumberToObject(pRoot, "snapshotEnable", pRaftCfg->snapshotEnable);
|
cJSON_AddNumberToObject(pRoot, "snapshotEnable", pRaftCfg->snapshotEnable);
|
||||||
|
|
||||||
|
char buf64[128];
|
||||||
|
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64);
|
||||||
|
|
||||||
cJSON *pJson = cJSON_CreateObject();
|
cJSON *pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
|
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
|
||||||
return pJson;
|
return pJson;
|
||||||
|
@ -172,6 +176,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
|
||||||
raftCfg.cfg = *pCfg;
|
raftCfg.cfg = *pCfg;
|
||||||
raftCfg.isStandBy = meta.isStandBy;
|
raftCfg.isStandBy = meta.isStandBy;
|
||||||
raftCfg.snapshotEnable = meta.snapshotEnable;
|
raftCfg.snapshotEnable = meta.snapshotEnable;
|
||||||
|
raftCfg.lastConfigIndex = meta.lastConfigIndex;
|
||||||
char *s = raftCfg2Str(&raftCfg);
|
char *s = raftCfg2Str(&raftCfg);
|
||||||
|
|
||||||
char buf[CONFIG_FILE_LEN] = {0};
|
char buf[CONFIG_FILE_LEN] = {0};
|
||||||
|
@ -199,6 +204,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
|
||||||
cJSON *pJsonSnapshotEnable = cJSON_GetObjectItem(pJson, "snapshotEnable");
|
cJSON *pJsonSnapshotEnable = cJSON_GetObjectItem(pJson, "snapshotEnable");
|
||||||
pRaftCfg->snapshotEnable = cJSON_GetNumberValue(pJsonSnapshotEnable);
|
pRaftCfg->snapshotEnable = cJSON_GetNumberValue(pJsonSnapshotEnable);
|
||||||
|
|
||||||
|
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
|
||||||
|
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
|
||||||
|
|
||||||
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||||
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
|
|
|
@ -553,15 +553,19 @@ void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void logStoreLog(SSyncLogStore* pLogStore) {
|
void logStoreLog(SSyncLogStore* pLogStore) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = logStore2Str(pLogStore);
|
char* serialized = logStore2Str(pLogStore);
|
||||||
sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
|
sTraceLong("logStoreLog | len:%lu | %s", strlen(serialized), serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
|
void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = logStore2Str(pLogStore);
|
char* serialized = logStore2Str(pLogStore);
|
||||||
sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTraceLong("logStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// for debug -----------------
|
// for debug -----------------
|
||||||
|
|
|
@ -352,7 +352,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
|
||||||
|
|
||||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
||||||
cJSON *pJson = snapshotSender2Json(pSender);
|
cJSON *pJson = snapshotSender2Json(pSender);
|
||||||
char *serialized = cJSON_Print(pJson);
|
char * serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
@ -473,7 +473,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
||||||
cJSON_AddStringToObject(pFromId, "addr", u64buf);
|
cJSON_AddStringToObject(pFromId, "addr", u64buf);
|
||||||
{
|
{
|
||||||
uint64_t u64 = pReceiver->fromId.addr;
|
uint64_t u64 = pReceiver->fromId.addr;
|
||||||
cJSON *pTmp = pFromId;
|
cJSON * pTmp = pFromId;
|
||||||
char host[128] = {0};
|
char host[128] = {0};
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
@ -497,7 +497,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
||||||
|
|
||||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
||||||
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
||||||
char *serialized = cJSON_Print(pJson);
|
char * serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,29 +214,31 @@ void syncUtilMsgNtoH(void* msg) {
|
||||||
pHead->vgId = ntohl(pHead->vgId);
|
pHead->vgId = ntohl(pHead->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
bool syncUtilIsData(tmsg_t msgType) {
|
bool syncUtilIsData(tmsg_t msgType) {
|
||||||
if (msgType == TDMT_SYNC_NOOP || msgType == TDMT_SYNC_CONFIG_CHANGE) {
|
if (msgType == TDMT_SYNC_NOOP || msgType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
bool syncUtilUserPreCommit(tmsg_t msgType) {
|
bool syncUtilUserPreCommit(tmsg_t msgType) {
|
||||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
|
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool syncUtilUserCommit(tmsg_t msgType) {
|
bool syncUtilUserCommit(tmsg_t msgType) {
|
||||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
|
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool syncUtilUserRollback(tmsg_t msgType) {
|
bool syncUtilUserRollback(tmsg_t msgType) {
|
||||||
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE) {
|
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -47,6 +47,7 @@ add_executable(syncTestTool "")
|
||||||
add_executable(syncRaftLogTest "")
|
add_executable(syncRaftLogTest "")
|
||||||
add_executable(syncRaftLogTest2 "")
|
add_executable(syncRaftLogTest2 "")
|
||||||
add_executable(syncRaftLogTest3 "")
|
add_executable(syncRaftLogTest3 "")
|
||||||
|
add_executable(syncLeaderTransferTest "")
|
||||||
|
|
||||||
|
|
||||||
target_sources(syncTest
|
target_sources(syncTest
|
||||||
|
@ -245,6 +246,10 @@ target_sources(syncRaftLogTest3
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncRaftLogTest3.cpp"
|
"syncRaftLogTest3.cpp"
|
||||||
)
|
)
|
||||||
|
target_sources(syncLeaderTransferTest
|
||||||
|
PRIVATE
|
||||||
|
"syncLeaderTransferTest.cpp"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_include_directories(syncTest
|
target_include_directories(syncTest
|
||||||
|
@ -492,6 +497,11 @@ target_include_directories(syncRaftLogTest3
|
||||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
target_include_directories(syncLeaderTransferTest
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_link_libraries(syncTest
|
target_link_libraries(syncTest
|
||||||
|
@ -690,6 +700,10 @@ target_link_libraries(syncRaftLogTest3
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
)
|
)
|
||||||
|
target_link_libraries(syncLeaderTransferTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
enable_testing()
|
enable_testing()
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncMessage.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncLeaderTransfer *createMsg() {
|
||||||
|
SyncLeaderTransfer *pMsg = syncLeaderTransferBuild(1000);
|
||||||
|
/*
|
||||||
|
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
|
||||||
|
pMsg->srcId.vgId = 100;
|
||||||
|
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
|
||||||
|
pMsg->destId.vgId = 100;
|
||||||
|
*/
|
||||||
|
pMsg->newLeaderId.addr = syncUtilAddr2U64("127.0.0.1", 9999);
|
||||||
|
pMsg->newLeaderId.vgId = 100;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void test1() {
|
||||||
|
SyncLeaderTransfer *pMsg = createMsg();
|
||||||
|
syncLeaderTransferLog2((char *)"test1:", pMsg);
|
||||||
|
syncLeaderTransferDestroy(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test2() {
|
||||||
|
SyncLeaderTransfer *pMsg = createMsg();
|
||||||
|
uint32_t len = pMsg->bytes;
|
||||||
|
char * serialized = (char *)taosMemoryMalloc(len);
|
||||||
|
syncLeaderTransferSerialize(pMsg, serialized, len);
|
||||||
|
SyncLeaderTransfer *pMsg2 = syncLeaderTransferBuild(1000);
|
||||||
|
syncLeaderTransferDeserialize(serialized, len, pMsg2);
|
||||||
|
syncLeaderTransferLog2((char *)"test2: syncLeaderTransferSerialize -> syncLeaderTransferDeserialize ", pMsg2);
|
||||||
|
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
syncLeaderTransferDestroy(pMsg);
|
||||||
|
syncLeaderTransferDestroy(pMsg2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test3() {
|
||||||
|
SyncLeaderTransfer *pMsg = createMsg();
|
||||||
|
uint32_t len;
|
||||||
|
char * serialized = syncLeaderTransferSerialize2(pMsg, &len);
|
||||||
|
SyncLeaderTransfer *pMsg2 = syncLeaderTransferDeserialize2(serialized, len);
|
||||||
|
syncLeaderTransferLog2((char *)"test3: syncLeaderTransferSerialize2 -> syncLeaderTransferDeserialize2 ", pMsg2);
|
||||||
|
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
syncLeaderTransferDestroy(pMsg);
|
||||||
|
syncLeaderTransferDestroy(pMsg2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test4() {
|
||||||
|
SyncLeaderTransfer *pMsg = createMsg();
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
SyncLeaderTransfer *pMsg2 = (SyncLeaderTransfer *)taosMemoryMalloc(rpcMsg.contLen);
|
||||||
|
syncLeaderTransferFromRpcMsg(&rpcMsg, pMsg2);
|
||||||
|
syncLeaderTransferLog2((char *)"test4: syncLeaderTransfer2RpcMsg -> syncLeaderTransferFromRpcMsg ", pMsg2);
|
||||||
|
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
syncLeaderTransferDestroy(pMsg);
|
||||||
|
syncLeaderTransferDestroy(pMsg2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test5() {
|
||||||
|
SyncLeaderTransfer *pMsg = createMsg();
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
SyncLeaderTransfer *pMsg2 = syncLeaderTransferFromRpcMsg2(&rpcMsg);
|
||||||
|
syncLeaderTransferLog2((char *)"test5: syncLeaderTransfer2RpcMsg -> syncLeaderTransferFromRpcMsg2 ", pMsg2);
|
||||||
|
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
syncLeaderTransferDestroy(pMsg);
|
||||||
|
syncLeaderTransferDestroy(pMsg2);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
gRaftDetailLog = true;
|
||||||
|
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||||
|
logTest();
|
||||||
|
|
||||||
|
test1();
|
||||||
|
test2();
|
||||||
|
test3();
|
||||||
|
test4();
|
||||||
|
test5();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -74,6 +74,7 @@ void test3() {
|
||||||
SRaftCfgMeta meta;
|
SRaftCfgMeta meta;
|
||||||
meta.isStandBy = 7;
|
meta.isStandBy = 7;
|
||||||
meta.snapshotEnable = 9;
|
meta.snapshotEnable = 9;
|
||||||
|
meta.lastConfigIndex = 789;
|
||||||
raftCfgCreateFile(pCfg, meta, s);
|
raftCfgCreateFile(pCfg, meta, s);
|
||||||
printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
|
printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
|
||||||
}
|
}
|
||||||
|
@ -98,6 +99,7 @@ void test5() {
|
||||||
pCfg->cfg.myIndex = taosGetTimestampSec();
|
pCfg->cfg.myIndex = taosGetTimestampSec();
|
||||||
pCfg->isStandBy += 2;
|
pCfg->isStandBy += 2;
|
||||||
pCfg->snapshotEnable += 3;
|
pCfg->snapshotEnable += 3;
|
||||||
|
pCfg->lastConfigIndex += 1000;
|
||||||
raftCfgPersist(pCfg);
|
raftCfgPersist(pCfg);
|
||||||
|
|
||||||
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);
|
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);
|
||||||
|
|
Loading…
Reference in New Issue