refactor(sync): add debug log
This commit is contained in:
parent
6a4cdadef6
commit
aad288ab17
|
@ -47,8 +47,9 @@ typedef enum {
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
TAOS_SYNC_PROPOSE_SUCCESS = 0,
|
||||||
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
|
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
|
||||||
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
|
TAOS_SYNC_ONLY_ONE_REPLICA = 2,
|
||||||
TAOS_SYNC_ONLY_ONE_REPLICA = 3,
|
TAOS_SYNC_NOT_IN_NEW_CONFIG = 3,
|
||||||
|
TAOS_SYNC_OTHER_ERROR = 100,
|
||||||
} ESyncProposeCode;
|
} ESyncProposeCode;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -199,15 +200,13 @@ bool syncIsRestoreFinish(int64_t rid);
|
||||||
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
|
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
|
||||||
|
|
||||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
|
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
|
||||||
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
|
|
||||||
|
// build SRpcMsg, need to call syncPropose with SRpcMsg
|
||||||
|
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
|
||||||
|
|
||||||
int32_t syncLeaderTransfer(int64_t rid);
|
int32_t syncLeaderTransfer(int64_t rid);
|
||||||
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
|
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
|
||||||
|
|
||||||
// to be moved to static
|
|
||||||
void syncStartNormal(int64_t rid);
|
|
||||||
void syncStartStandBy(int64_t rid);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -382,22 +382,22 @@ void mndStop(SMnode *pMnode) {
|
||||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
int32_t code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
int32_t code = TAOS_SYNC_OTHER_ERROR;
|
||||||
|
|
||||||
if (!syncEnvIsStart()) {
|
if (!syncEnvIsStart()) {
|
||||||
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
|
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
|
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
|
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndAcquireSyncRef(pMnode) != 0) {
|
if (mndAcquireSyncRef(pMnode) != 0) {
|
||||||
mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
|
mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
|
||||||
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
char logBuf[512] = {0};
|
char logBuf[512] = {0};
|
||||||
|
@ -457,7 +457,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
code = TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -495,7 +495,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
} else {
|
} else {
|
||||||
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
code = TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -236,7 +236,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
tsem_wait(&pMgmt->syncSem);
|
tsem_wait(&pMgmt->syncSem);
|
||||||
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
|
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
} else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
|
} else if (code == TAOS_SYNC_OTHER_ERROR) {
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_APP_ERROR;
|
terrno = TSDB_CODE_APP_ERROR;
|
||||||
|
@ -254,13 +254,16 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
void mndSyncStart(SMnode *pMnode) {
|
void mndSyncStart(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
|
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
|
||||||
|
syncStart(pMgmt->sync);
|
||||||
|
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
|
||||||
|
|
||||||
|
/*
|
||||||
if (pMgmt->standby) {
|
if (pMgmt->standby) {
|
||||||
syncStartStandBy(pMgmt->sync);
|
syncStartStandBy(pMgmt->sync);
|
||||||
} else {
|
} else {
|
||||||
syncStart(pMgmt->sync);
|
syncStart(pMgmt->sync);
|
||||||
}
|
}
|
||||||
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndSyncStop(SMnode *pMnode) {}
|
void mndSyncStop(SMnode *pMnode) {}
|
||||||
|
|
|
@ -291,7 +291,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
int32_t ret = TAOS_SYNC_OTHER_ERROR;
|
||||||
|
|
||||||
if (syncEnvIsStart()) {
|
if (syncEnvIsStart()) {
|
||||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||||
|
@ -372,13 +372,13 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
||||||
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
ret = TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
} else {
|
} else {
|
||||||
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
||||||
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
ret = TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|
|
@ -290,11 +290,14 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||||
|
|
||||||
void vnodeSyncStart(SVnode *pVnode) {
|
void vnodeSyncStart(SVnode *pVnode) {
|
||||||
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
||||||
|
syncStart(pVnode->sync);
|
||||||
|
/*
|
||||||
if (pVnode->config.standby) {
|
if (pVnode->config.standby) {
|
||||||
syncStartStandBy(pVnode->sync);
|
syncStartStandBy(pVnode->sync);
|
||||||
} else {
|
} else {
|
||||||
syncStart(pVnode->sync);
|
syncStart(pVnode->sync);
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||||
|
|
|
@ -168,6 +168,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
|
||||||
void syncNodeStart(SSyncNode* pSyncNode);
|
void syncNodeStart(SSyncNode* pSyncNode);
|
||||||
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||||
void syncNodeClose(SSyncNode* pSyncNode);
|
void syncNodeClose(SSyncNode* pSyncNode);
|
||||||
|
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak);
|
||||||
|
|
||||||
// option
|
// option
|
||||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||||
|
@ -232,6 +233,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
|
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
|
||||||
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
|
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
|
||||||
|
|
||||||
|
void syncStartNormal(int64_t rid);
|
||||||
|
void syncStartStandBy(int64_t rid);
|
||||||
|
|
||||||
// for debug --------------
|
// for debug --------------
|
||||||
void syncNodePrint(SSyncNode* pObj);
|
void syncNodePrint(SSyncNode* pObj);
|
||||||
void syncNodePrint2(char* s, SSyncNode* pObj);
|
void syncNodePrint2(char* s, SSyncNode* pObj);
|
||||||
|
|
|
@ -995,7 +995,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
ths->commitIndex = snapshot.lastApplyIndex;
|
ths->commitIndex = snapshot.lastApplyIndex;
|
||||||
|
|
||||||
sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin,
|
sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, commitBegin,
|
||||||
commitEnd, syncUtilState2String(ths->state));
|
commitEnd, syncUtilState2String(ths->state));
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex beginIndex = ths->commitIndex + 1;
|
SyncIndex beginIndex = ths->commitIndex + 1;
|
||||||
|
|
|
@ -57,7 +57,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
pSyncNode->commitIndex = snapshot.lastApplyIndex;
|
pSyncNode->commitIndex = snapshot.lastApplyIndex;
|
||||||
|
|
||||||
sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
|
sDebug("vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
|
||||||
pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state));
|
pSyncNode->commitIndex, snapshot.lastApplyIndex, syncUtilState2String(pSyncNode->state));
|
||||||
}
|
}
|
||||||
|
|
||||||
// update commit index
|
// update commit index
|
||||||
|
|
|
@ -149,12 +149,12 @@ void syncStop(int64_t rid) {
|
||||||
int32_t syncSetStandby(int64_t rid) {
|
int32_t syncSetStandby(int64_t rid) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
return -1;
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return -1;
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
// state change
|
// state change
|
||||||
|
@ -173,14 +173,68 @@ int32_t syncSetStandby(int64_t rid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
|
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
|
||||||
int32_t ret = 0;
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
char* newconfig = syncCfg2Str((SSyncCfg*)pSyncCfg);
|
if (pSyncNode == NULL) {
|
||||||
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
|
}
|
||||||
|
ASSERT(rid == pSyncNode->rid);
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
bool IamInNew = false;
|
||||||
|
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
|
||||||
|
SRaftId newId;
|
||||||
|
newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
|
||||||
|
newId.vgId = pSyncNode->vgId;
|
||||||
|
if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
|
||||||
|
IamInNew = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!IamInNew) {
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return TAOS_SYNC_NOT_IN_NEW_CONFIG;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
|
||||||
|
pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
|
||||||
|
pRpcMsg->info.noResp = 1;
|
||||||
|
pRpcMsg->contLen = strlen(newconfig) + 1;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
|
||||||
|
taosMemoryFree(newconfig);
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
|
}
|
||||||
|
ASSERT(rid == pSyncNode->rid);
|
||||||
|
|
||||||
|
bool IamInNew = false;
|
||||||
|
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
|
||||||
|
SRaftId newId;
|
||||||
|
newId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
|
||||||
|
newId.vgId = pSyncNode->vgId;
|
||||||
|
if (syncUtilSameId(&(pSyncNode->myRaftId), &newId)) {
|
||||||
|
IamInNew = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!IamInNew) {
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return TAOS_SYNC_NOT_IN_NEW_CONFIG;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
|
||||||
if (gRaftDetailLog) {
|
if (gRaftDetailLog) {
|
||||||
sInfo("==syncReconfig== newconfig:%s", newconfig);
|
sInfo("==syncReconfig== newconfig:%s", newconfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
|
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
|
||||||
rpcMsg.info.noResp = 1;
|
rpcMsg.info.noResp = 1;
|
||||||
|
@ -188,27 +242,42 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
|
||||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
|
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
|
||||||
taosMemoryFree(newconfig);
|
taosMemoryFree(newconfig);
|
||||||
ret = syncPropose(rid, &rpcMsg, false);
|
ret = syncNodePropose(pSyncNode, &rpcMsg, false);
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLeaderTransfer(int64_t rid) {
|
int32_t syncLeaderTransfer(int64_t rid) {
|
||||||
int32_t ret = 0;
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
|
}
|
||||||
|
ASSERT(rid == pSyncNode->rid);
|
||||||
|
|
||||||
|
if (pSyncNode->peersNum > 0) {
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
|
||||||
|
int32_t ret = syncLeaderTransferTo(rid, newLeader);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
|
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
return false;
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
assert(rid == pSyncNode->rid);
|
ASSERT(rid == pSyncNode->rid);
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
if (pSyncNode->replicaNum == 1) {
|
if (pSyncNode->replicaNum == 1) {
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
|
||||||
sError("only one replica, cannot drop leader");
|
sError("only one replica, cannot drop leader");
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return TAOS_SYNC_ONLY_ONE_REPLICA;
|
return TAOS_SYNC_ONLY_ONE_REPLICA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,26 +289,11 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
|
||||||
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
|
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
|
||||||
syncLeaderTransferDestroy(pMsg);
|
syncLeaderTransferDestroy(pMsg);
|
||||||
|
|
||||||
ret = syncPropose(rid, &rpcMsg, false);
|
ret = syncNodePropose(pSyncNode, &rpcMsg, false);
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
|
|
||||||
int32_t ret = 0;
|
|
||||||
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
|
|
||||||
|
|
||||||
pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
|
|
||||||
pRpcMsg->info.noResp = 1;
|
|
||||||
pRpcMsg->contLen = strlen(newconfig) + 1;
|
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
|
||||||
snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
|
|
||||||
taosMemoryFree(newconfig);
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool syncCanLeaderTransfer(int64_t rid) {
|
bool syncCanLeaderTransfer(int64_t rid) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
@ -272,8 +326,6 @@ bool syncCanLeaderTransfer(int64_t rid) {
|
||||||
return matchOK;
|
return matchOK;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncGiveUpLeader(int64_t rid) { return 0; }
|
|
||||||
|
|
||||||
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
||||||
int32_t ret = syncPropose(rid, pMsg, isWeak);
|
int32_t ret = syncPropose(rid, pMsg, isWeak);
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -467,10 +519,19 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
||||||
|
|
||||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
return TAOS_SYNC_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
assert(rid == pSyncNode->rid);
|
assert(rid == pSyncNode->rid);
|
||||||
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
|
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
|
||||||
|
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
|
||||||
|
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
|
||||||
|
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
SRespStub stub;
|
SRespStub stub;
|
||||||
|
@ -485,15 +546,14 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
||||||
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
|
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
|
||||||
ret = TAOS_SYNC_PROPOSE_SUCCESS;
|
ret = TAOS_SYNC_PROPOSE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
sTrace("syncPropose pSyncNode->FpEqMsg is NULL");
|
sError("syncPropose pSyncNode->FpEqMsg is NULL");
|
||||||
}
|
}
|
||||||
syncClientRequestDestroy(pSyncMsg);
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
} else {
|
} else {
|
||||||
sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
|
sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
|
||||||
ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
|
ret = TAOS_SYNC_PROPOSE_NOT_LEADER;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1241,7 +1301,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
|
||||||
|
|
||||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
sDebug("vgId:%d sync event become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy,
|
sDebug("vgId:%d sync event become follower, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy,
|
||||||
debugStr);
|
debugStr);
|
||||||
|
|
||||||
// maybe clear leader cache
|
// maybe clear leader cache
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
@ -1276,7 +1336,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
//
|
//
|
||||||
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
sDebug("vgId:%d sync event become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy,
|
sDebug("vgId:%d sync event become leader, isStandBy:%d, %s", pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy,
|
||||||
debugStr);
|
debugStr);
|
||||||
|
|
||||||
// state change
|
// state change
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
||||||
|
@ -1885,7 +1945,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
ESyncState state = flag;
|
ESyncState state = flag;
|
||||||
sDebug("vgId:%d sync event commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
|
sDebug("vgId:%d sync event commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, beginIndex,
|
||||||
endIndex, syncUtilState2String(state));
|
endIndex, syncUtilState2String(state));
|
||||||
|
|
||||||
// execute fsm
|
// execute fsm
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
|
|
Loading…
Reference in New Issue