Merge pull request #13807 from taosdata/feature/3.0_mhli

refactor(sync): update replica index in snapshot sender
This commit is contained in:
Li Minghao 2022-06-14 15:21:09 +08:00 committed by GitHub
commit 41a285a3f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 330 additions and 86 deletions

View File

@ -47,8 +47,9 @@ typedef enum {
typedef enum { typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0, TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER = 1, TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2, TAOS_SYNC_ONLY_ONE_REPLICA = 2,
TAOS_SYNC_ONLY_ONE_REPLICA = 3, TAOS_SYNC_NOT_IN_NEW_CONFIG = 3,
TAOS_SYNC_OTHER_ERROR = 100,
} ESyncProposeCode; } ESyncProposeCode;
typedef enum { typedef enum {
@ -110,6 +111,7 @@ typedef struct SSyncFSM {
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
@ -199,15 +201,13 @@ bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
// build SRpcMsg, need to call syncPropose with SRpcMsg
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
int32_t syncLeaderTransfer(int64_t rid); int32_t syncLeaderTransfer(int64_t rid);
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader); int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
// to be moved to static
void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -467,6 +467,7 @@ typedef struct SyncLeaderTransfer {
SRaftId srcId; SRaftId srcId;
SRaftId destId; SRaftId destId;
*/ */
SNodeInfo newNodeInfo;
SRaftId newLeaderId; SRaftId newLeaderId;
} SyncLeaderTransfer; } SyncLeaderTransfer;

View File

@ -380,22 +380,22 @@ void mndStop(SMnode *pMnode) {
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
int32_t code = TAOS_SYNC_PROPOSE_OTHER_ERROR; int32_t code = TAOS_SYNC_OTHER_ERROR;
if (!syncEnvIsStart()) { if (!syncEnvIsStart()) {
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
return TAOS_SYNC_PROPOSE_OTHER_ERROR; return TAOS_SYNC_OTHER_ERROR;
} }
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync); SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
return TAOS_SYNC_PROPOSE_OTHER_ERROR; return TAOS_SYNC_OTHER_ERROR;
} }
if (mndAcquireSyncRef(pMnode) != 0) { if (mndAcquireSyncRef(pMnode) != 0) {
mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
return TAOS_SYNC_PROPOSE_OTHER_ERROR; return TAOS_SYNC_OTHER_ERROR;
} }
char logBuf[512] = {0}; char logBuf[512] = {0};
@ -456,7 +456,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else { } else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = TAOS_SYNC_PROPOSE_OTHER_ERROR; code = TAOS_SYNC_OTHER_ERROR;
} }
} else { } else {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
@ -497,7 +497,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else { } else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType)); mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = TAOS_SYNC_PROPOSE_OTHER_ERROR; code = TAOS_SYNC_OTHER_ERROR;
} }
} }

View File

@ -236,7 +236,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
terrno = TSDB_CODE_APP_NOT_READY; terrno = TSDB_CODE_APP_NOT_READY;
} else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) { } else if (code == TAOS_SYNC_OTHER_ERROR) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} else { } else {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
@ -254,13 +254,16 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
void mndSyncStart(SMnode *pMnode) { void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync);
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
/*
if (pMgmt->standby) { if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync); syncStartStandBy(pMgmt->sync);
} else { } else {
syncStart(pMgmt->sync); syncStart(pMgmt->sync);
} }
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); */
} }
void mndSyncStop(SMnode *pMnode) {} void mndSyncStop(SMnode *pMnode) {}

View File

@ -289,7 +289,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
} }
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; int32_t ret = TAOS_SYNC_OTHER_ERROR;
if (syncEnvIsStart()) { if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
@ -370,13 +370,13 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} else { } else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; ret = TAOS_SYNC_OTHER_ERROR;
} }
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} else { } else {
vError("==vnodeProcessSyncReq== error syncEnv stop"); vError("==vnodeProcessSyncReq== error syncEnv stop");
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR; ret = TAOS_SYNC_OTHER_ERROR;
} }
return ret; return ret;

View File

@ -283,11 +283,14 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
void vnodeSyncStart(SVnode *pVnode) { void vnodeSyncStart(SVnode *pVnode) {
syncSetMsgCb(pVnode->sync, &pVnode->msgCb); syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync);
/*
if (pVnode->config.standby) { if (pVnode->config.standby) {
syncStartStandBy(pVnode->sync); syncStartStandBy(pVnode->sync);
} else { } else {
syncStart(pVnode->sync); syncStart(pVnode->sync);
} }
*/
} }
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }

View File

@ -168,6 +168,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStart(SSyncNode* pSyncNode);
void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak);
// option // option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
@ -232,6 +233,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid);
// for debug -------------- // for debug --------------
void syncNodePrint(SSyncNode* pObj); void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj);

View File

@ -149,12 +149,12 @@ void syncStop(int64_t rid) {
int32_t syncSetStandby(int64_t rid) { int32_t syncSetStandby(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return -1; return TAOS_SYNC_OTHER_ERROR;
} }
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return -1; return TAOS_SYNC_OTHER_ERROR;
} }
// state change // state change
@ -174,14 +174,88 @@ int32_t syncSetStandby(int64_t rid) {
return 0; return 0;
} }
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
int32_t ret = 0; SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
char* newconfig = syncCfg2Str((SSyncCfg*)pSyncCfg); if (pSyncNode == NULL) {
return TAOS_SYNC_OTHER_ERROR;
}
ASSERT(rid == pSyncNode->rid);
int32_t ret = 0;
bool IamInNew = false;
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInNew = true;
}
/*
SRaftId newId;
newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
newId.vgId = pSyncNode->vgId;
if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
IamInNew = true;
}
*/
}
if (!IamInNew) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_NOT_IN_NEW_CONFIG;
}
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
pRpcMsg->info.noResp = 1;
pRpcMsg->contLen = strlen(newconfig) + 1;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
taosMemoryFree(newconfig);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_OTHER_ERROR;
}
ASSERT(rid == pSyncNode->rid);
bool IamInNew = false;
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
IamInNew = true;
}
/*
// some problem in inet_addr
SRaftId newId = EMPTY_RAFT_ID;
newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
newId.vgId = pSyncNode->vgId;
if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
IamInNew = true;
}
*/
}
if (!IamInNew) {
sError("sync reconfig error, not in new config");
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_NOT_IN_NEW_CONFIG;
}
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
if (gRaftDetailLog) { if (gRaftDetailLog) {
sInfo("==syncReconfig== newconfig:%s", newconfig); sInfo("==syncReconfig== newconfig:%s", newconfig);
} }
int32_t ret = 0;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE; rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
rpcMsg.info.noResp = 1; rpcMsg.info.noResp = 1;
@ -189,58 +263,59 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig); snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
taosMemoryFree(newconfig); taosMemoryFree(newconfig);
ret = syncPropose(rid, &rpcMsg, false); ret = syncNodePropose(pSyncNode, &rpcMsg, false);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret; return ret;
} }
int32_t syncLeaderTransfer(int64_t rid) { int32_t syncLeaderTransfer(int64_t rid) {
int32_t ret = 0; SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_OTHER_ERROR;
}
ASSERT(rid == pSyncNode->rid);
if (pSyncNode->peersNum == 0) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_OTHER_ERROR;
}
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
int32_t ret = syncLeaderTransferTo(rid, newLeader);
return ret; return ret;
} }
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) { int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return false; return TAOS_SYNC_OTHER_ERROR;
} }
assert(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
int32_t ret = 0; int32_t ret = 0;
if (pSyncNode->replicaNum == 1) { if (pSyncNode->replicaNum == 1) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
sError("only one replica, cannot drop leader"); sError("only one replica, cannot drop leader");
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return TAOS_SYNC_ONLY_ONE_REPLICA; return TAOS_SYNC_ONLY_ONE_REPLICA;
} }
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
pMsg->newLeaderId.vgId = pSyncNode->vgId; pMsg->newLeaderId.vgId = pSyncNode->vgId;
pMsg->newNodeInfo = newLeader;
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
syncLeaderTransferDestroy(pMsg); syncLeaderTransferDestroy(pMsg);
ret = syncPropose(rid, &rpcMsg, false); ret = syncNodePropose(pSyncNode, &rpcMsg, false);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret; return ret;
} }
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
int32_t ret = 0;
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
pRpcMsg->info.noResp = 1;
pRpcMsg->contLen = strlen(newconfig) + 1;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
taosMemoryFree(newconfig);
return ret;
}
bool syncCanLeaderTransfer(int64_t rid) { bool syncCanLeaderTransfer(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
@ -273,8 +348,6 @@ bool syncCanLeaderTransfer(int64_t rid) {
return matchOK; return matchOK;
} }
int32_t syncGiveUpLeader(int64_t rid) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak); int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret; return ret;
@ -469,16 +542,26 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return TAOS_SYNC_PROPOSE_OTHER_ERROR; return TAOS_SYNC_OTHER_ERROR;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub; SRespStub stub;
stub.createTime = taosGetTimestampMs(); stub.createTime = taosGetTimestampMs();
stub.rpcMsg = *pMsg; stub.rpcMsg = *pMsg;
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
sDebug("vgId:%d, sync event propose, type:%s seq:%" PRIu64 " handle:%p", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), sDebug("vgId:%d sync event propose, type:%s seq:%" PRIu64 " handle:%p", pSyncNode->vgId, TMSG_INFO(pMsg->msgType),
seqNum, pMsg->info.handle); seqNum, pMsg->info.handle);
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId); SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak, pSyncNode->vgId);
@ -488,16 +571,14 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
ret = TAOS_SYNC_PROPOSE_SUCCESS; ret = TAOS_SYNC_PROPOSE_SUCCESS;
} else { } else {
sTrace("syncPropose pSyncNode->FpEqMsg is NULL"); sError("syncPropose pSyncNode->FpEqMsg is NULL");
} }
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else { } else {
sDebug("vgId:%d, failed to propose since not leader, type:%s handle:%p %s", pSyncNode->vgId, sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
TMSG_INFO(pMsg->msgType), pMsg->info.handle, syncUtilState2String(pSyncNode->state));
ret = TAOS_SYNC_PROPOSE_NOT_LEADER; ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
} }
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret; return ret;
} }
@ -1163,7 +1244,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
SRaftId oldReplicasId[TSDB_MAX_REPLICA]; SRaftId oldReplicasId[TSDB_MAX_REPLICA];
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId)); memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
memcpy(oldSenders, pSyncNode->senders, sizeof(oldSenders)); for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d sync event save senders %d, %p", pSyncNode->vgId, i, oldSenders[i]);
if (gRaftDetailLog) {
;
}
}
// init internal // init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
@ -1195,24 +1282,51 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
// reset snapshot senders, memory leak // reset snapshot senders
// clear new
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
(pSyncNode->senders)[i] = NULL; (pSyncNode->senders)[i] = NULL;
} }
// reset new
for (int i = 0; i < pSyncNode->replicaNum; ++i) { for (int i = 0; i < pSyncNode->replicaNum; ++i) {
// reset sender
bool reset = false;
for (int j = 0; j < TSDB_MAX_REPLICA; ++j) { for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) { if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event reset sender for %lu, %s:%d", pSyncNode->vgId, (pSyncNode->replicasId)[i].addr, host, port); sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p", pSyncNode->vgId,
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
(pSyncNode->senders)[i] = oldSenders[j]; (pSyncNode->senders)[i] = oldSenders[j];
oldSenders[j] = NULL;
reset = true;
// reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i;
sDebug("vgId:%d sync event udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", pSyncNode->vgId,
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
} }
} }
} }
// create new
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) { if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug("vgId:%d sync event create new sender %p replicaIndex:%d", pSyncNode->vgId, (pSyncNode->senders)[i], i);
}
}
// free old
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]);
sDebug("vgId:%d sync event delete old sender %p replicaIndex:%d", pSyncNode->vgId, oldSenders[i], i);
oldSenders[i] = NULL;
} }
} }
@ -1272,8 +1386,8 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
} }
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug("vgId:%d sync event become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, sDebug("vgId:%d sync event become follower, isStandBy:%d, replicaNum:%d, %s", pSyncNode->vgId,
debugStr); pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr);
// maybe clear leader cache // maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
@ -1307,8 +1421,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>> // /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
// //
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
sDebug("vgId:%d sync event become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, sDebug("vgId:%d sync event become leader, isStandBy:%d, replicaNum:%d %s", pSyncNode->vgId,
debugStr); pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr);
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->state = TAOS_SYNC_STATE_LEADER;
@ -1857,10 +1971,52 @@ const char* syncStr(ESyncState state) {
} }
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer; SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) {
/*
char host[128];
uint16_t port;
syncUtilU642Addr(pSyncLeaderTransfer->newLeaderId.addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, host, port,
pSyncLeaderTransfer->newLeaderId.addr);
*/
sDebug("vgId:%d sync event, begin leader transfer", ths->vgId);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId,
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
// reset elect timer now!
int32_t electMS = 1;
int32_t ret = syncNodeRestartElectTimer(ths, electMS);
ASSERT(ret == 0);
} }
/*
if (syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId))) {
// reset elect timer now!
int32_t electMS = 1;
int32_t ret = syncNodeRestartElectTimer(ths, electMS);
ASSERT(ret == 0);
}
*/
if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta;
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = 0;
cbMeta.index = pEntry->index;
cbMeta.isWeak = pEntry->isWeak;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.state = ths->state;
cbMeta.term = pEntry->term;
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta);
}
syncLeaderTransferDestroy(pSyncLeaderTransfer);
return 0; return 0;
} }
@ -1884,26 +2040,31 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
bool isDrop; bool isDrop;
//if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) { // if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
if (IamInNew) { if (IamInNew) {
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
// change isStandBy to normal // change isStandBy to normal
if (!isDrop) { if (!isDrop) {
char tmpbuf[128];
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths, "config change"); syncNodeBecomeLeader(ths, tmpbuf);
} else { } else {
syncNodeBecomeFollower(ths, "config change"); syncNodeBecomeFollower(ths, tmpbuf);
} }
} }
} else { } else {
syncNodeBecomeFollower(ths, "config change2"); char tmpbuf[128];
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
syncNodeBecomeFollower(ths, tmpbuf);
} }
if (gRaftDetailLog) { if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg); char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg); char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d index:%ld \n", sOld, sNew, isDrop, pEntry->index); sInfo("==config change== 0x11 old:%s new:%s isDrop:%d index:%ld IamInNew:%d \n", sOld, sNew, isDrop, pEntry->index,
IamInNew);
taosMemoryFree(sOld); taosMemoryFree(sOld);
taosMemoryFree(sNew); taosMemoryFree(sNew);
} }
@ -1965,7 +2126,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ASSERT(code == 0); ASSERT(code == 0);
} }
// config change // leader transfer
if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) { if (pEntry->originalRpcType == TDMT_SYNC_LEADER_TRANSFER) {
code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry); code = syncDoLeaderTransfer(ths, &rpcMsg, pEntry);
ASSERT(code == 0); ASSERT(code == 0);

View File

@ -393,7 +393,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -514,7 +514,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf); cJSON_AddStringToObject(pFromId, "addr", u64buf);
{ {
uint64_t u64 = pReceiver->fromId.addr; uint64_t u64 = pReceiver->fromId.addr;
cJSON * pTmp = pFromId; cJSON *pTmp = pFromId;
char host[128] = {0}; char host[128] = {0};
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
@ -538,7 +538,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -588,6 +588,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// maybe update lastconfig // maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
int32_t oldReplicaNum = pSyncNode->replicaNum;
// update new config myIndex // update new config myIndex
bool IamInNew = false; bool IamInNew = false;
SSyncCfg newSyncCfg = pMsg->lastConfig; SSyncCfg newSyncCfg = pMsg->lastConfig;
@ -614,10 +616,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// change isStandBy to normal // change isStandBy to normal
if (!isDrop) { if (!isDrop) {
char tmpbuf[128];
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d", oldReplicaNum, newSyncCfg.replicaNum);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, "config change"); syncNodeBecomeLeader(pSyncNode, tmpbuf);
} else { } else {
syncNodeBecomeFollower(pSyncNode, "config change"); syncNodeBecomeFollower(pSyncNode, tmpbuf);
} }
} }
} }

View File

@ -1,4 +1,5 @@
add_executable(syncTest "") add_executable(syncTest "")
add_executable(syncRaftIdCheck "")
add_executable(syncEnvTest "") add_executable(syncEnvTest "")
add_executable(syncPingTimerTest "") add_executable(syncPingTimerTest "")
add_executable(syncIOTickQTest "") add_executable(syncIOTickQTest "")
@ -54,6 +55,10 @@ target_sources(syncTest
PRIVATE PRIVATE
"syncTest.cpp" "syncTest.cpp"
) )
target_sources(syncRaftIdCheck
PRIVATE
"syncRaftIdCheck.cpp"
)
target_sources(syncEnvTest target_sources(syncEnvTest
PRIVATE PRIVATE
"syncEnvTest.cpp" "syncEnvTest.cpp"
@ -257,6 +262,11 @@ target_include_directories(syncTest
"${TD_SOURCE_DIR}/include/libs/sync" "${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncRaftIdCheck
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEnvTest target_include_directories(syncEnvTest
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync" "${TD_SOURCE_DIR}/include/libs/sync"
@ -508,6 +518,10 @@ target_link_libraries(syncTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncRaftIdCheck
sync
gtest_main
)
target_link_libraries(syncEnvTest target_link_libraries(syncEnvTest
sync sync
gtest_main gtest_main

View File

@ -0,0 +1,32 @@
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncUtil.h"
void usage(char* exe) {
printf("Usage: %s host port \n", exe);
printf("Usage: %s u64 \n", exe);
}
int main(int argc, char** argv) {
if (argc == 2) {
uint64_t u64 = atoll(argv[1]);
char host[128];
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
printf("%lu -> %s:%d \n", u64, host, port);
} else if (argc == 3) {
uint64_t u64;
char* host = argv[1];
uint16_t port = atoi(argv[2]);
u64 = syncUtilAddr2U64(host, port);
printf("%s:%d -> %lu \n", host, port, u64);
} else {
usage(argv[0]);
exit(-1);
}
return 0;
}

View File

@ -153,6 +153,16 @@ void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMe
taosMemoryFree(s); taosMemoryFree(s);
} }
void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"==callback== ==LeaderTransferCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, flag:%lu, term:%lu "
"currentTerm:%lu \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
cbMeta.flag, cbMeta.term, cbMeta.currentTerm);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}
SSyncFSM* createFsm() { SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
memset(pFsm, 0, sizeof(*pFsm)); memset(pFsm, 0, sizeof(*pFsm));
@ -172,6 +182,8 @@ SSyncFSM* createFsm() {
pFsm->FpSnapshotStopWrite = SnapshotStopWrite; pFsm->FpSnapshotStopWrite = SnapshotStopWrite;
pFsm->FpSnapshotDoWrite = SnapshotDoWrite; pFsm->FpSnapshotDoWrite = SnapshotDoWrite;
pFsm->FpLeaderTransferCb = LeaderTransferCb;
return pFsm; return pFsm;
} }
@ -277,7 +289,8 @@ void usage(char* exe) {
printf( printf(
"usage: %s replicaNum(1-5) myIndex(0-..) enableSnapshot(0/1) lastApplyIndex(>=-1) lastApplyTerm(>=0) " "usage: %s replicaNum(1-5) myIndex(0-..) enableSnapshot(0/1) lastApplyIndex(>=-1) lastApplyTerm(>=0) "
"writeRecordNum(>=0) " "writeRecordNum(>=0) "
"isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) finishLastApplyIndex(>=-1) finishLastApplyTerm(>=0) \n", "isStandBy(0/1) isConfigChange(0-5) iterTimes(>=0) finishLastApplyIndex(>=-1) finishLastApplyTerm(>=0) "
"leaderTransfer(0/1) \n",
exe); exe);
} }
@ -294,9 +307,9 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
int main(int argc, char** argv) { int main(int argc, char** argv) {
sprintf(tsTempDir, "%s", "."); sprintf(tsTempDir, "%s", ".");
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR; sDebugFlag = DEBUG_SCREEN + DEBUG_FILE + DEBUG_TRACE + DEBUG_INFO + DEBUG_ERROR + DEBUG_DEBUG;
if (argc != 12) { if (argc != 13) {
usage(argv[0]); usage(argv[0]);
exit(-1); exit(-1);
} }
@ -312,12 +325,14 @@ int main(int argc, char** argv) {
int32_t iterTimes = atoi(argv[9]); int32_t iterTimes = atoi(argv[9]);
int32_t finishLastApplyIndex = atoi(argv[10]); int32_t finishLastApplyIndex = atoi(argv[10]);
int32_t finishLastApplyTerm = atoi(argv[11]); int32_t finishLastApplyTerm = atoi(argv[11]);
int32_t leaderTransfer = atoi(argv[12]);
sTrace( sInfo(
"args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, " "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "
"isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d", "isStandBy:%d, isConfigChange:%d, iterTimes:%d, finishLastApplyIndex:%d, finishLastApplyTerm:%d, "
"leaderTransfer:%d",
replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange, replicaNum, myIndex, enableSnapshot, lastApplyIndex, lastApplyTerm, writeRecordNum, isStandBy, isConfigChange,
iterTimes, finishLastApplyIndex, finishLastApplyTerm); iterTimes, finishLastApplyIndex, finishLastApplyTerm, leaderTransfer);
// check parameter // check parameter
assert(replicaNum >= 1 && replicaNum <= 5); assert(replicaNum >= 1 && replicaNum <= 5);
@ -363,24 +378,31 @@ int main(int argc, char** argv) {
//--------------------------- //---------------------------
int32_t alreadySend = 0; int32_t alreadySend = 0;
int32_t leaderTransferWait = 0;
while (1) { while (1) {
char* simpleStr = syncNode2SimpleStr(pSyncNode); char* simpleStr = syncNode2SimpleStr(pSyncNode);
leaderTransferWait++;
if (leaderTransferWait == 7) {
sTrace("begin leader transfer ...");
int32_t ret = syncLeaderTransfer(rid);
}
if (alreadySend < writeRecordNum) { if (alreadySend < writeRecordNum) {
SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex); SRpcMsg* pRpcMsg = createRpcMsg(alreadySend, writeRecordNum, myIndex);
int32_t ret = syncPropose(rid, pRpcMsg, false); int32_t ret = syncPropose(rid, pRpcMsg, false);
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
sTrace("%s value%d write not leader", simpleStr, alreadySend); sTrace("%s value%d write not leader, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait);
} else { } else {
assert(ret == 0); assert(ret == 0);
sTrace("%s value%d write ok", simpleStr, alreadySend); sTrace("%s value%d write ok, leaderTransferWait:%d", simpleStr, alreadySend, leaderTransferWait);
} }
alreadySend++; alreadySend++;
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
taosMemoryFree(pRpcMsg); taosMemoryFree(pRpcMsg);
} else { } else {
sTrace("%s", simpleStr); sTrace("%s, leaderTransferWait:%d", simpleStr, leaderTransferWait);
} }
taosMsleep(1000); taosMsleep(1000);