refactor(sync): do leader transfer
This commit is contained in:
parent
03fe08a890
commit
129b289bdf
|
@ -273,16 +273,8 @@ int32_t syncLeaderTransfer(int64_t rid) {
|
|||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
|
||||
if (pSyncNode->peersNum == 0) {
|
||||
int32_t ret = syncNodeLeaderTransfer(pSyncNode);
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
|
||||
int32_t ret = syncLeaderTransferTo(rid, newLeader);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -293,25 +285,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
|
|||
return -1;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
int32_t ret = 0;
|
||||
|
||||
if (pSyncNode->replicaNum == 1) {
|
||||
sError("only one replica, cannot drop leader");
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
terrno = TSDB_CODE_SYN_ONE_REPLICA;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
|
||||
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
|
||||
pMsg->newLeaderId.vgId = pSyncNode->vgId;
|
||||
pMsg->newNodeInfo = newLeader;
|
||||
ASSERT(pMsg != NULL);
|
||||
SRpcMsg rpcMsg = {0};
|
||||
syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg);
|
||||
syncLeaderTransferDestroy(pMsg);
|
||||
|
||||
ret = syncNodePropose(pSyncNode, &rpcMsg, false);
|
||||
int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return ret;
|
||||
}
|
||||
|
@ -337,6 +312,12 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
do {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort);
|
||||
syncNodeEventLog(pSyncNode, logBuf);
|
||||
} while (0);
|
||||
|
||||
SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId);
|
||||
pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort);
|
||||
pMsg->newLeaderId.vgId = pSyncNode->vgId;
|
||||
|
@ -1147,8 +1128,6 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
|||
void syncNodeClose(SSyncNode* pSyncNode) {
|
||||
syncNodeEventLog(pSyncNode, "sync close");
|
||||
|
||||
// leader transfer
|
||||
|
||||
int32_t ret;
|
||||
ASSERT(pSyncNode != NULL);
|
||||
|
||||
|
@ -2643,7 +2622,7 @@ const char* syncStr(ESyncState state) {
|
|||
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
|
||||
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
|
||||
|
||||
syncNodeEventLog(ths, "begin leader transfer");
|
||||
syncNodeEventLog(ths, "do leader transfer");
|
||||
|
||||
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
|
||||
bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
|
||||
|
|
Loading…
Reference in New Issue