refactor: adjust vnode sync
This commit is contained in:
parent
f2a84eda09
commit
ff91282bee
|
@ -180,7 +180,6 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
|
|
||||||
// start the sync timer after the queue is ready
|
// start the sync timer after the queue is ready
|
||||||
int32_t vnodeStart(SVnode *pVnode) {
|
int32_t vnodeStart(SVnode *pVnode) {
|
||||||
vnodeSyncSetMsgCb(pVnode);
|
|
||||||
vnodeSyncStart(pVnode);
|
vnodeSyncStart(pVnode);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,71 +13,62 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
|
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
|
||||||
|
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
|
||||||
|
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
|
||||||
|
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
||||||
|
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
||||||
|
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
||||||
|
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
|
||||||
|
|
||||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo = {
|
||||||
syncInfo.vgId = pVnode->config.vgId;
|
.vgId = pVnode->config.vgId,
|
||||||
SSyncCfg *pCfg = &(syncInfo.syncCfg);
|
.syncCfg = pVnode->config.syncCfg,
|
||||||
pCfg->replicaNum = pVnode->config.syncCfg.replicaNum;
|
.pWal = pVnode->pWal,
|
||||||
pCfg->myIndex = pVnode->config.syncCfg.myIndex;
|
.msgcb = NULL,
|
||||||
memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo));
|
.FpSendMsg = vnodeSyncSendMsg,
|
||||||
|
.FpEqMsg = vnodeSyncEqMsg,
|
||||||
|
};
|
||||||
|
|
||||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", path);
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
|
||||||
syncInfo.pWal = pVnode->pWal;
|
syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
|
||||||
|
|
||||||
syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
|
|
||||||
syncInfo.msgcb = NULL;
|
|
||||||
syncInfo.FpSendMsg = vnodeSyncSendMsg;
|
|
||||||
syncInfo.FpEqMsg = vnodeSyncEqMsg;
|
|
||||||
|
|
||||||
pVnode->sync = syncOpen(&syncInfo);
|
pVnode->sync = syncOpen(&syncInfo);
|
||||||
assert(pVnode->sync > 0);
|
if (pVnode->sync <= 0) {
|
||||||
|
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// for test
|
|
||||||
setPingTimerMS(pVnode->sync, 3000);
|
setPingTimerMS(pVnode->sync, 3000);
|
||||||
setElectTimerMS(pVnode->sync, 500);
|
setElectTimerMS(pVnode->sync, 500);
|
||||||
setHeartbeatTimerMS(pVnode->sync, 100);
|
setHeartbeatTimerMS(pVnode->sync, 100);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeSyncStart(SVnode *pVnode) {
|
void vnodeSyncStart(SVnode *pVnode) {
|
||||||
|
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
||||||
syncStart(pVnode->sync);
|
syncStart(pVnode->sync);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncClose(SVnode *pVnode) {
|
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||||
// stop by ref id
|
|
||||||
syncStop(pVnode->sync);
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); }
|
|
||||||
|
|
||||||
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
||||||
|
|
||||||
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||||
pMsg->info.noResp = 1;
|
|
||||||
return tmsgSendReq(pEpSet, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
|
||||||
SVnode *pVnode = (SVnode *)(pFsm->data);
|
|
||||||
vnodeGetSnapshot(pVnode, pSnapshot);
|
|
||||||
|
|
||||||
/*
|
|
||||||
pSnapshot->data = NULL;
|
|
||||||
pSnapshot->lastApplyIndex = 0;
|
|
||||||
pSnapshot->lastApplyTerm = 0;
|
|
||||||
*/
|
|
||||||
|
|
||||||
|
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
|
vnodeGetSnapshot(pFsm->data, pSnapshot);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||||
if (pFsm->FpGetSnapshot != NULL) {
|
if (pFsm->FpGetSnapshot != NULL) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot = {0};
|
||||||
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
||||||
beginIndex = snapshot.lastApplyIndex;
|
beginIndex = snapshot.lastApplyIndex;
|
||||||
}
|
}
|
||||||
|
@ -128,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf),
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
||||||
|
@ -136,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
||||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncFSM *syncVnodeMakeFsm(SVnode *pVnode) {
|
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
pFsm->data = pVnode;
|
pFsm->data = pVnode;
|
||||||
pFsm->FpCommitCb = vnodeSyncCommitCb;
|
pFsm->FpCommitCb = vnodeSyncCommitMsg;
|
||||||
pFsm->FpPreCommitCb = vnodeSyncPreCommitCb;
|
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
||||||
pFsm->FpRollBackCb = vnodeSyncRollBackCb;
|
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
||||||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshotCb;
|
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
||||||
return pFsm;
|
return pFsm;
|
||||||
}
|
}
|
Loading…
Reference in New Issue