refactor(sync): add vnodeSnapshotStopRead into callback
This commit is contained in:
parent
78efbaabe1
commit
432e1a39b7
|
@ -283,6 +283,12 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
||||||
syncClientRequestDestroy(pSyncMsg);
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
|
||||||
|
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
|
||||||
|
syncClientRequestBatchDestroyDeep(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||||
ASSERT(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
@ -317,7 +323,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
ret = -1;
|
ret = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_WAL_FIRST) {
|
||||||
// use wal first strategy
|
// use wal first strategy
|
||||||
|
|
||||||
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||||
|
@ -387,7 +393,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
} else {
|
} else {
|
||||||
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
vError("sync env is stop");
|
||||||
ret = -1;
|
ret = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -438,16 +444,11 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
SSnapshot snapshot = {0};
|
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
|
||||||
char logBuf[256] = {0};
|
|
||||||
|
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", pFsm,
|
||||||
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
|
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType,
|
||||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
TMSG_INFO(pMsg->msgType));
|
||||||
beginIndex);
|
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
|
@ -458,25 +459,36 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
char logBuf[256] = {0};
|
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", pFsm,
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType,
|
||||||
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
TMSG_INFO(pMsg->msgType));
|
||||||
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
char logBuf[256] = {0};
|
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", pFsm,
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType,
|
||||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
TMSG_INFO(pMsg->msgType));
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { return 0; }
|
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
||||||
|
SVnode *pVnode = pFsm->data;
|
||||||
|
SSnapshotParam *pSnapshotParam = pParam;
|
||||||
|
int32_t code =
|
||||||
|
vnodeSnapshotReaderOpen(pVnode, (SVSnapshotReader **)ppReader, pSnapshotParam->start, pSnapshotParam->end);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
|
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
|
||||||
|
SVnode *pVnode = pFsm->data;
|
||||||
|
int32_t code = vnodeSnapshotReaderClose(pReader);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
|
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
|
||||||
|
SVnode *pVnode = pFsm->data;
|
||||||
|
int32_t code = vnodeSnapshotRead(pReader, (const void **)ppBuf, len);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { return 0; }
|
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { return 0; }
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue