refactor(sync): optimized one replica
This commit is contained in:
parent
77b365f0cc
commit
a6f33ba01c
|
@ -524,7 +524,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
|
|||
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
||||
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
|
||||
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
|
||||
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
|
||||
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||
|
@ -541,7 +541,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
|||
// -----------------------------------------
|
||||
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
|
||||
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SyncClientRequest* pMsg);
|
||||
typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
|
||||
typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||
typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||
typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||
|
|
|
@ -442,7 +442,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
syncPingReplyDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||
|
@ -491,7 +491,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
syncPingReplyDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||
|
@ -555,10 +555,10 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
|||
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
|
||||
if (!IsReq(pMsg)) return 0;
|
||||
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
|
||||
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
|
||||
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||
return -1;
|
||||
}
|
||||
|
@ -723,7 +723,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
|||
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
SMonStbDesc desc = {0};
|
||||
SMonStbDesc desc = {0};
|
||||
|
||||
SName name1 = {0};
|
||||
tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
|
|
@ -137,6 +137,26 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr());
|
||||
} else {
|
||||
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
|
||||
if (code == 1) {
|
||||
do {
|
||||
static int32_t cnt = 0;
|
||||
if (cnt++ % 1000 == 1) {
|
||||
vInfo("vgId:%d, msg:%p apply right now, apply index:%ld, msgtype:%s,%d", vgId, pMsg,
|
||||
pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
||||
}
|
||||
} while (0);
|
||||
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
rsp.code = terrno;
|
||||
vInfo("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
|
||||
}
|
||||
|
||||
if (rsp.info.handle != NULL) {
|
||||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
code = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -260,7 +280,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||
|
@ -359,34 +379,18 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
|||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||
char logBuf[256] = {0};
|
||||
|
||||
if (pFsm->FpGetSnapshotInfo != NULL) {
|
||||
(*pFsm->FpGetSnapshotInfo)(pFsm, &snapshot);
|
||||
beginIndex = snapshot.lastApplyIndex;
|
||||
}
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
beginIndex);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
|
||||
if (cbMeta.index > beginIndex) {
|
||||
snprintf(
|
||||
logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||
|
||||
} else {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, "
|
||||
"beginIndex :%ld\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
beginIndex);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||
}
|
||||
|
||||
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
|
|
|
@ -50,7 +50,7 @@ typedef struct SSyncIO {
|
|||
void *pSyncNode;
|
||||
int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg);
|
||||
int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg);
|
||||
int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg);
|
||||
int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg, SyncIndex* pRetIndex);
|
||||
int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg);
|
||||
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
|
||||
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);
|
||||
|
|
|
@ -169,7 +169,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
|
|||
void syncNodeStart(SSyncNode* pSyncNode);
|
||||
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||
void syncNodeClose(SSyncNode* pSyncNode);
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak);
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
|
||||
|
||||
// option
|
||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||
|
@ -233,6 +233,7 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
|
|||
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
|
||||
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
|
||||
|
||||
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
|
||||
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
|
||||
|
||||
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
|
||||
|
|
|
@ -102,6 +102,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
}
|
||||
}
|
||||
|
||||
// maybe execute fsm
|
||||
if (newCommitIndex > pSyncNode->commitIndex) {
|
||||
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
|
||||
SyncIndex endIndex = newCommitIndex;
|
||||
|
|
|
@ -281,7 +281,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
if (io->FpOnSyncClientRequest != NULL) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg);
|
||||
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg, NULL);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
|
|||
// process message ----
|
||||
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
||||
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
|
||||
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
|
||||
|
||||
// life cycle
|
||||
static void syncFreeNode(void* param);
|
||||
|
@ -627,7 +627,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||
int32_t ret = 0;
|
||||
|
||||
char eventLog[128];
|
||||
|
@ -664,13 +664,34 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
|
|||
SRpcMsg rpcMsg;
|
||||
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
|
||||
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
|
||||
ret = 0;
|
||||
// optimized one replica
|
||||
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
|
||||
SyncIndex retIndex;
|
||||
int32_t code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, &retIndex);
|
||||
if (code == 0) {
|
||||
pMsg->info.conn.applyIndex = retIndex;
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||
ret = 1;
|
||||
sDebug("vgId:%d optimized index:%ld success, msgtype:%s,%d", pSyncNode->vgId, retIndex,
|
||||
TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
||||
} else {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("vgId:%d optimized index:%ld error, msgtype:%s,%d", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType),
|
||||
pMsg->msgType);
|
||||
}
|
||||
|
||||
} else {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("syncPropose pSyncNode->FpEqMsg is NULL");
|
||||
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
|
||||
ret = 0;
|
||||
} else {
|
||||
ret = -1;
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
sError("enqueue msg error, FpEqMsg is NULL");
|
||||
}
|
||||
}
|
||||
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
goto _END;
|
||||
|
||||
|
@ -2377,7 +2398,7 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
|
|||
// /\ UNCHANGED <<messages, serverVars, candidateVars,
|
||||
// leaderVars, commitIndex>>
|
||||
//
|
||||
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
|
||||
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
|
||||
int32_t ret = 0;
|
||||
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
|
||||
|
||||
|
@ -2436,6 +2457,14 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
|
|||
rpcFreeCont(rpcMsg.pCont);
|
||||
}
|
||||
|
||||
if (pRetIndex != NULL) {
|
||||
if (ret == 0 && pEntry != NULL) {
|
||||
*pRetIndex = pEntry->index;
|
||||
} else {
|
||||
*pRetIndex = SYNC_INDEX_INVALID;
|
||||
}
|
||||
}
|
||||
|
||||
syncEntryDestory(pEntry);
|
||||
return ret;
|
||||
}
|
||||
|
@ -2600,6 +2629,10 @@ static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFin
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
|
||||
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
|
||||
}
|
||||
|
||||
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
|
||||
int32_t code = 0;
|
||||
ESyncState state = flag;
|
||||
|
@ -2621,7 +2654,13 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
|||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
// user commit
|
||||
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||
bool internalExecute = (ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType);
|
||||
if (ths->replicaNum == 1) {
|
||||
internalExecute = syncNodeIsOptimizedOneReplica(ths, &rpcMsg) && !(ths->restoreFinish);
|
||||
}
|
||||
|
||||
// execute fsm in apply thread, or execute outside syncPropose
|
||||
if (internalExecute) {
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||
|
|
Loading…
Reference in New Issue