Merge pull request #17803 from taosdata/fix/TD-20052
enh: refact syncMsg code
This commit is contained in:
commit
a71dbd83d2
|
@ -99,7 +99,6 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
|
|||
*/
|
||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
|
||||
int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg);
|
||||
int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg);
|
||||
void mndPostProcessQueryMsg(SRpcMsg *pMsg);
|
||||
|
||||
|
|
|
@ -220,21 +220,13 @@ const char* syncStr(ESyncState state);
|
|||
bool syncIsRestoreFinish(int64_t rid);
|
||||
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot);
|
||||
|
||||
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg);
|
||||
|
||||
// build SRpcMsg, need to call syncPropose with SRpcMsg
|
||||
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
|
||||
|
||||
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
|
||||
int32_t syncLeaderTransfer(int64_t rid);
|
||||
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
|
||||
|
||||
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
|
||||
int32_t syncEndSnapshot(int64_t rid);
|
||||
|
||||
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
|
||||
|
||||
SSyncNode* syncNodeAcquire(int64_t rid);
|
||||
void syncNodeRelease(SSyncNode* pNode);
|
||||
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -67,24 +67,6 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void mmProcessSyncCtrlMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
pMsg->info.node = pMgmt->pMnode;
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, get from mnode-sync-ctrl queue", pMsg);
|
||||
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
pHead->contLen = ntohl(pHead->contLen);
|
||||
pHead->vgId = ntohl(pHead->vgId);
|
||||
|
||||
int32_t code = mndProcessSyncCtrlMsg(pMsg);
|
||||
|
||||
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
pMsg->info.node = pMgmt->pMnode;
|
||||
|
@ -252,7 +234,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.min = 1,
|
||||
.max = 1,
|
||||
.name = "mnode-sync-ctrl",
|
||||
.fp = (FItem)mmProcessSyncCtrlMsg,
|
||||
.fp = (FItem)mmProcessSyncMsg,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->syncCtrlWorker, &scCfg) != 0) {
|
||||
|
|
|
@ -133,22 +133,6 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
|||
}
|
||||
}
|
||||
|
||||
static void vmProcessSyncCtrlQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-sync queue", pVnode->vgId, pMsg);
|
||||
|
||||
int32_t code = vnodeProcessSyncCtrlMsg(pVnode->pImpl, pMsg, NULL); // no response here
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
|
@ -317,7 +301,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
|
||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
||||
pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncCtrlQueue);
|
||||
pVnode->pSyncCtrlQ = tWWorkerAllocQueue(&pMgmt->syncCtrlPool, pVnode, (FItems)vmProcessSyncQueue);
|
||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
|
||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
pVnode->pStreamQ = tQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||
|
|
|
@ -474,128 +474,18 @@ void mndStop(SMnode *pMnode) {
|
|||
mndCleanupTimer(pMnode);
|
||||
}
|
||||
|
||||
int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
int32_t code = 0;
|
||||
|
||||
mInfo("vgId:%d, process sync ctrl msg", 1);
|
||||
|
||||
if (!syncIsInit()) {
|
||||
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
|
||||
if (pSyncNode == NULL) {
|
||||
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
|
||||
SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
|
||||
SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatReplyDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
int32_t code = 0;
|
||||
|
||||
if (!syncIsInit()) {
|
||||
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
|
||||
if (pSyncNode == NULL) {
|
||||
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnTimer(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING) {
|
||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnPing(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
|
||||
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
|
||||
syncSnapshotSendDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
|
||||
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
|
||||
syncSnapshotRspDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
|
||||
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
|
||||
syncLocalCmdDestroy(pSyncMsg);
|
||||
|
||||
} else {
|
||||
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
code = -1;
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
mGTrace("vgId:1, sync msg:%p will be processed, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
|
||||
int32_t code = syncProcessMsg(pMgmt->sync, pMsg);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
mGError("vgId:1, failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -83,7 +83,6 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
|||
|
||||
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
|
|
|
@ -230,142 +230,16 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t vnodeProcessSyncCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
int32_t code = 0;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
if (!syncIsInit()) {
|
||||
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||
if (pSyncNode == NULL) {
|
||||
vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId, pMsg);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
|
||||
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
|
||||
SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
|
||||
SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatReplyDestroy(pSyncMsg);
|
||||
|
||||
} else {
|
||||
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
|
||||
code = -1;
|
||||
}
|
||||
|
||||
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
||||
code);
|
||||
syncNodeRelease(pSyncNode);
|
||||
if (code != 0 && terrno == 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
int32_t code = 0;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
if (!syncIsInit()) {
|
||||
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId, pMsg);
|
||||
terrno = TSDB_CODE_APP_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||
if (pSyncNode == NULL) {
|
||||
vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId, pMsg);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
|
||||
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnTimer(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING) {
|
||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnPing(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
|
||||
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
|
||||
syncSnapshotSendDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
|
||||
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
|
||||
syncSnapshotRspDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
|
||||
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
|
||||
syncLocalCmdDestroy(pSyncMsg);
|
||||
|
||||
} else {
|
||||
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
|
||||
code = -1;
|
||||
int32_t code = syncProcessMsg(pVnode->sync, pMsg);
|
||||
if (code != 0) {
|
||||
vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
|
||||
TMSG_INFO(pMsg->msgType), terrstr());
|
||||
}
|
||||
|
||||
vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
||||
code);
|
||||
syncNodeRelease(pSyncNode);
|
||||
if (code != 0 && terrno == 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -302,7 +302,6 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
|
|||
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
|
||||
|
||||
bool syncNodeCanChange(SSyncNode* pSyncNode);
|
||||
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);
|
||||
|
||||
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
|
||||
|
|
|
@ -80,6 +80,7 @@ SSyncNode *syncNodeAcquire(int64_t rid) {
|
|||
SSyncNode *pNode = taosAcquireRef(gNodeRefId, rid);
|
||||
if (pNode == NULL) {
|
||||
sTrace("failed to acquire node from refId:%" PRId64, rid);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
return pNode;
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "sync.h"
|
||||
#include "syncAppendEntries.h"
|
||||
#include "syncAppendEntriesReply.h"
|
||||
|
@ -34,8 +35,6 @@
|
|||
#include "syncUtil.h"
|
||||
#include "syncVoteMgr.h"
|
||||
|
||||
// ------ local funciton ---------
|
||||
// enqueue message ----
|
||||
static void syncNodeEqPingTimer(void* param, void* tmrId);
|
||||
static void syncNodeEqElectTimer(void* param, void* tmrId);
|
||||
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
|
||||
|
@ -44,162 +43,149 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
|
|||
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId);
|
||||
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg);
|
||||
|
||||
// process message ----
|
||||
int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
|
||||
int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
|
||||
int64_t syncOpen(SSyncInfo* pSyncInfo) {
|
||||
SSyncNode* pNode = syncNodeOpen(pSyncInfo);
|
||||
if (pNode == NULL) {
|
||||
SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo);
|
||||
if (pSyncNode == NULL) {
|
||||
sError("vgId:%d, failed to open sync node", pSyncInfo->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pNode->rid = syncNodeAdd(pNode);
|
||||
if (pNode->rid < 0) {
|
||||
syncNodeClose(pNode);
|
||||
pSyncNode->rid = syncNodeAdd(pSyncNode);
|
||||
if (pSyncNode->rid < 0) {
|
||||
syncNodeClose(pSyncNode);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return pNode->rid;
|
||||
return pSyncNode->rid;
|
||||
}
|
||||
|
||||
void syncStart(int64_t rid) {
|
||||
SSyncNode* pNode = syncNodeAcquire(rid);
|
||||
if (pNode != NULL) {
|
||||
syncNodeStart(pNode);
|
||||
syncNodeRelease(pNode);
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode != NULL) {
|
||||
syncNodeStart(pSyncNode);
|
||||
syncNodeRelease(pSyncNode);
|
||||
}
|
||||
}
|
||||
|
||||
void syncStop(int64_t rid) {
|
||||
SSyncNode* pNode = syncNodeAcquire(rid);
|
||||
if (pNode != NULL) {
|
||||
syncNodeRelease(pNode);
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode != NULL) {
|
||||
syncNodeRelease(pSyncNode);
|
||||
syncNodeRemove(rid);
|
||||
}
|
||||
}
|
||||
|
||||
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) {
|
||||
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
|
||||
if (!IamInNew) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pNewCfg->replicaNum > pSyncNode->replicaNum + 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pNewCfg->replicaNum < pSyncNode->replicaNum - 1) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
int32_t ret = 0;
|
||||
|
||||
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
|
||||
syncNodeRelease(pSyncNode);
|
||||
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
|
||||
sError("invalid new config. vgId:%d", pSyncNode->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
|
||||
pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
|
||||
pRpcMsg->info.noResp = 1;
|
||||
pRpcMsg->contLen = strlen(newconfig) + 1;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
|
||||
taosMemoryFree(newconfig);
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
return ret;
|
||||
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
|
||||
if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
|
||||
return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
|
||||
}
|
||||
|
||||
int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
if (pSyncNode == NULL) return -1;
|
||||
|
||||
if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
|
||||
syncNodeRelease(pSyncNode);
|
||||
terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
|
||||
sError("invalid new config. vgId:%d", pSyncNode->vgId);
|
||||
sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
#if 0
|
||||
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
|
||||
int32_t ret = 0;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
|
||||
rpcMsg.info.noResp = 1;
|
||||
rpcMsg.contLen = strlen(newconfig) + 1;
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
|
||||
taosMemoryFree(newconfig);
|
||||
ret = syncNodePropose(pSyncNode, &rpcMsg, false);
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
return ret;
|
||||
#else
|
||||
syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg);
|
||||
syncNodeDoConfigChange(pSyncNode, pNewCfg, SYNC_INDEX_INVALID);
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
|
||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||
syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]);
|
||||
syncHbTimerInit(pSyncNode, &pSyncNode->peerHeartbeatTimerArr[i], pSyncNode->replicasId[i]);
|
||||
}
|
||||
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
|
||||
syncNodeReplicate(pSyncNode);
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
|
||||
int32_t code = -1;
|
||||
if (!syncIsInit()) return code;
|
||||
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) return code;
|
||||
|
||||
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
|
||||
SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
|
||||
SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatReplyDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||
SyncTimeout* pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnTimer(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING) {
|
||||
SyncPing* pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnPing(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||
SyncPingReply* pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnPingReply(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest* pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote* pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries* pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
|
||||
SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
|
||||
syncSnapshotSendDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
|
||||
SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
|
||||
syncSnapshotRspDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
|
||||
SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
|
||||
syncLocalCmdDestroy(pSyncMsg);
|
||||
} else {
|
||||
sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
code = -1;
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t syncLeaderTransfer(int64_t rid) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
if (pSyncNode == NULL) return -1;
|
||||
|
||||
int32_t ret = syncNodeLeaderTransfer(pSyncNode);
|
||||
syncNodeRelease(pSyncNode);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
|
||||
int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
|
||||
syncNodeRelease(pSyncNode);
|
||||
return ret;
|
||||
}
|
||||
|
||||
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
|
||||
SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
|
||||
|
||||
|
|
Loading…
Reference in New Issue