sync integration
This commit is contained in:
parent
01008939f2
commit
9a7924049f
|
@ -84,6 +84,9 @@ typedef struct SSyncFSM {
|
|||
struct SSyncRaftEntry;
|
||||
typedef struct SSyncRaftEntry SSyncRaftEntry;
|
||||
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
#define SYNC_INDEX_INVALID -1
|
||||
|
||||
// abstract definition of log store in raft
|
||||
// SWal implements it
|
||||
typedef struct SSyncLogStore {
|
||||
|
|
|
@ -109,6 +109,7 @@ void vmStopWorker(SVnodesMgmt *pMgmt);
|
|||
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
||||
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
||||
|
||||
int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); // sync integration
|
||||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
|
|
|
@ -106,6 +106,16 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
|||
pCfg->hashBegin = pCreate->hashBegin;
|
||||
pCfg->hashEnd = pCreate->hashEnd;
|
||||
pCfg->hashMethod = pCreate->hashMethod;
|
||||
|
||||
// sync integration
|
||||
pCfg->syncCfg.myIndex = pCreate->selfIndex;
|
||||
pCfg->syncCfg.replicaNum = pCreate->replica;
|
||||
memset(&(pCfg->syncCfg.nodeInfo), 0, sizeof(pCfg->syncCfg.nodeInfo));
|
||||
for (int i = 0; i < pCreate->replica; ++i) {
|
||||
(pCfg->syncCfg.nodeInfo)[i].nodePort = (pCreate->replicas)[i].port;
|
||||
snprintf((pCfg->syncCfg.nodeInfo)[i].nodeFqdn, sizeof((pCfg->syncCfg.nodeInfo)[i].nodeFqdn), "%s",
|
||||
(pCreate->replicas)[i].fqdn);
|
||||
}
|
||||
}
|
||||
|
||||
static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) {
|
||||
|
@ -157,6 +167,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||
msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; // sync integration
|
||||
msgCb.qsizeFp = vmGetQueueSize;
|
||||
|
||||
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb);
|
||||
|
@ -357,4 +368,15 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
|||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
|
||||
// sync integration
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_PING_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||
}
|
||||
|
|
|
@ -137,6 +137,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
|||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||
msgCb.queueFps[SYNC_QUEUE] = vmPutMsgToSyncQueue; // sync integration
|
||||
msgCb.qsizeFp = vmGetQueueSize;
|
||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
|
||||
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb);
|
||||
|
|
|
@ -306,6 +306,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT
|
|||
dTrace("msg:%p, will be put into vnode-merge queue", pMsg);
|
||||
taosWriteQitem(pVnode->pMergeQ, pMsg);
|
||||
break;
|
||||
case SYNC_QUEUE: // sync integration
|
||||
dTrace("msg:%p, will be put into vnode-sync queue", pMsg);
|
||||
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
||||
break;
|
||||
default:
|
||||
code = -1;
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
|
@ -332,6 +336,11 @@ int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
|||
return vmPutRpcMsgToQueue(pWrapper, pRpc, MERGE_QUEUE);
|
||||
}
|
||||
|
||||
// sync integration
|
||||
int32_t vmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||
return vmPutRpcMsgToQueue(pWrapper, pRpc, SYNC_QUEUE);
|
||||
}
|
||||
|
||||
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
|
||||
int32_t size = -1;
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId);
|
||||
|
|
|
@ -16,6 +16,7 @@ target_sources(
|
|||
"src/vnd/vnodeWrite.c"
|
||||
"src/vnd/vnodeModule.c"
|
||||
"src/vnd/vnodeSvr.c"
|
||||
"src/vnd/vnodeSync.c"
|
||||
|
||||
# meta
|
||||
"src/meta/metaOpen.c"
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "tarray.h"
|
||||
#include "tfs.h"
|
||||
#include "wal.h"
|
||||
#include "sync.h"
|
||||
|
||||
#include "tcommon.h"
|
||||
#include "tfs.h"
|
||||
|
@ -144,6 +145,7 @@ struct SVnodeCfg {
|
|||
bool isWeak;
|
||||
STsdbCfg tsdbCfg;
|
||||
SWalCfg walCfg;
|
||||
SSyncCfg syncCfg; // sync integration
|
||||
uint32_t hashBegin;
|
||||
uint32_t hashEnd;
|
||||
int8_t hashMethod;
|
||||
|
|
|
@ -54,8 +54,8 @@ typedef struct SQWorkerMgmt SQHandle;
|
|||
|
||||
#define VNODE_META_DIR "meta"
|
||||
#define VNODE_TSDB_DIR "tsdb"
|
||||
#define VNODE_TQ_DIR "tq"
|
||||
#define VNODE_WAL_DIR "wal"
|
||||
#define VNODE_TQ_DIR "tq"
|
||||
#define VNODE_WAL_DIR "wal"
|
||||
|
||||
typedef struct {
|
||||
int8_t streamType; // sma or other
|
||||
|
@ -68,8 +68,8 @@ typedef struct {
|
|||
} SStreamSinkInfo;
|
||||
|
||||
typedef struct {
|
||||
SVnode* pVnode;
|
||||
SHashObj* pHash; // streamId -> SStreamSinkInfo
|
||||
SVnode *pVnode;
|
||||
SHashObj *pHash; // streamId -> SStreamSinkInfo
|
||||
} SSink;
|
||||
|
||||
// SVState
|
||||
|
@ -85,25 +85,26 @@ struct SVnodeInfo {
|
|||
};
|
||||
|
||||
struct SVnode {
|
||||
char* path;
|
||||
char *path;
|
||||
SVnodeCfg config;
|
||||
SVState state;
|
||||
STfs* pTfs;
|
||||
STfs *pTfs;
|
||||
SMsgCb msgCb;
|
||||
SVBufPool* pBufPool;
|
||||
SMeta* pMeta;
|
||||
STsdb* pTsdb;
|
||||
SWal* pWal;
|
||||
STQ* pTq;
|
||||
SSink* pSink;
|
||||
SVBufPool *pBufPool;
|
||||
SMeta *pMeta;
|
||||
STsdb *pTsdb;
|
||||
SWal *pWal;
|
||||
STQ *pTq;
|
||||
SSink *pSink;
|
||||
int64_t sync; // sync integration
|
||||
tsem_t canCommit;
|
||||
SQHandle* pQuery;
|
||||
SQHandle *pQuery;
|
||||
};
|
||||
|
||||
#define TD_VID(PVNODE) (PVNODE)->config.vgId
|
||||
|
||||
// sma
|
||||
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data);
|
||||
|
||||
#include "vnd.h"
|
||||
|
||||
|
@ -113,6 +114,8 @@ void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
|||
|
||||
#include "tq.h"
|
||||
|
||||
#include "vnodeSync.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_VNODE_SYNC_H_
|
||||
#define _TD_VNODE_SYNC_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
int32_t vnodeSyncOpen(SVnode *pVnode);
|
||||
int32_t vnodeSyncStart(SVnode *pVnode);
|
||||
void vnodeSyncClose(SVnode *pVnode);
|
||||
|
||||
int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg);
|
||||
int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg);
|
||||
|
||||
void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
||||
void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
||||
void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
||||
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot);
|
||||
|
||||
SSyncFSM *syncVnodeMakeFsm();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_VNODE_SYNC_H_*/
|
|
@ -124,6 +124,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// sync integration
|
||||
// open sync
|
||||
if (vnodeSyncOpen(pVnode)) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (vnodeBegin() < 0) {
|
||||
goto _err;
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sync.h"
|
||||
#include "syncTools.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
|
||||
|
@ -198,8 +200,86 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
|||
tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data);
|
||||
}
|
||||
|
||||
// sync integration
|
||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
/*vInfo("sync message is processed");*/
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||
assert(pSyncNode != NULL);
|
||||
|
||||
ESyncState state = syncGetMyRole(pVnode->sync);
|
||||
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
|
||||
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
|
||||
char logBuf[512];
|
||||
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
syncRpcMsgLog2(logBuf, pMsg);
|
||||
taosMemoryFree(syncNodeStr);
|
||||
|
||||
SRpcMsg *pRpcMsg = pMsg;
|
||||
|
||||
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
|
||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
|
||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||
syncPingDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||
syncPingReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesDestroy(pSyncMsg);
|
||||
|
||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||
assert(pSyncMsg != NULL);
|
||||
|
||||
syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||
|
||||
} else {
|
||||
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sync.h"
|
||||
#include "syncTools.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
// sync integration
|
||||
|
||||
int32_t vnodeSyncOpen(SVnode *pVnode) {
|
||||
SSyncInfo syncInfo;
|
||||
syncInfo.vgId = pVnode->config.vgId;
|
||||
SSyncCfg *pCfg = &(syncInfo.syncCfg);
|
||||
pCfg->replicaNum = pVnode->config.syncCfg.replicaNum;
|
||||
pCfg->myIndex = pVnode->config.syncCfg.myIndex;
|
||||
memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo));
|
||||
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pVnode->path);
|
||||
syncInfo.pWal = pVnode->pWal;
|
||||
|
||||
syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
|
||||
syncInfo.rpcClient = NULL;
|
||||
syncInfo.FpSendMsg = vnodeSendMsg;
|
||||
syncInfo.queue = NULL;
|
||||
syncInfo.FpEqMsg = vnodeSyncEqMsg;
|
||||
|
||||
pVnode->sync = syncOpen(&syncInfo);
|
||||
assert(pVnode->sync > 0);
|
||||
|
||||
// for test
|
||||
setPingTimerMS(pVnode->sync, 3000);
|
||||
setElectTimerMS(pVnode->sync, 500);
|
||||
setHeartbeatTimerMS(pVnode->sync, 100);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSyncStart(SVnode *pVnode) {
|
||||
syncStart(pVnode->sync);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeSyncClose(SVnode *pVnode) {
|
||||
// stop by ref id
|
||||
syncStop(pVnode->sync);
|
||||
}
|
||||
|
||||
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle) { syncSetQ(pVnode->sync, (void *)(&(pVnode->msgCb))); }
|
||||
|
||||
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle) { syncSetRpc(pVnode->sync, (void *)(&(pVnode->msgCb))); }
|
||||
|
||||
int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg) {
|
||||
int32_t ret = 0;
|
||||
SMsgCb *pMsgCb = qHandle;
|
||||
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
|
||||
tmsgPutToQueue(qHandle, SYNC_QUEUE, pMsg);
|
||||
} else {
|
||||
vError("vnodeSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
int32_t ret = 0;
|
||||
SMsgCb *pMsgCb = rpcHandle;
|
||||
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
|
||||
tmsgSendReq(rpcHandle, pEpSet, pMsg);
|
||||
} else {
|
||||
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||
pSnapshot->data = NULL;
|
||||
pSnapshot->lastApplyIndex = 0;
|
||||
pSnapshot->lastApplyTerm = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||
if (pFsm->FpGetSnapshot != NULL) {
|
||||
SSnapshot snapshot;
|
||||
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
||||
beginIndex = snapshot.lastApplyIndex;
|
||||
}
|
||||
|
||||
if (cbMeta.index > beginIndex) {
|
||||
char logBuf[256];
|
||||
snprintf(
|
||||
logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
|
||||
SRpcMsg applyMsg;
|
||||
applyMsg = *pMsg;
|
||||
applyMsg.pCont = rpcMallocCont(applyMsg.contLen);
|
||||
assert(applyMsg.contLen == pMsg->contLen);
|
||||
memcpy(applyMsg.pCont, pMsg->pCont, applyMsg.contLen);
|
||||
|
||||
// recover handle for response
|
||||
SVnode *pVnode = (SVnode *)(pFsm->data);
|
||||
SRpcMsg saveRpcMsg;
|
||||
int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg);
|
||||
if (ret == 1) {
|
||||
applyMsg.handle = saveRpcMsg.handle;
|
||||
applyMsg.ahandle = saveRpcMsg.ahandle;
|
||||
}
|
||||
|
||||
// put to applyQ
|
||||
tmsgPutToQueue(&(pVnode->msgCb), APPLY_QUEUE, &applyMsg);
|
||||
|
||||
} else {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, "
|
||||
"beginIndex :%ld\n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
||||
beginIndex);
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf),
|
||||
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
||||
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
char logBuf[256];
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
SSyncFSM *syncVnodeMakeFsm(SVnode *pVnode) {
|
||||
SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
|
||||
pFsm->data = pVnode;
|
||||
pFsm->FpCommitCb = vnodeSyncCommitCb;
|
||||
pFsm->FpPreCommitCb = vnodeSyncPreCommitCb;
|
||||
pFsm->FpRollBackCb = vnodeSyncRollBackCb;
|
||||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshotCb;
|
||||
return pFsm;
|
||||
}
|
|
@ -27,9 +27,6 @@ extern "C" {
|
|||
#include "syncRaftEntry.h"
|
||||
#include "taosdef.h"
|
||||
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
#define SYNC_INDEX_INVALID -1
|
||||
|
||||
typedef struct SSyncLogStoreData {
|
||||
SSyncNode* pSyncNode;
|
||||
SWal* pWal;
|
||||
|
|
|
@ -112,7 +112,7 @@ SSyncNode *syncNodeInit() {
|
|||
for (int i = 0; i < replicaNum; ++i) {
|
||||
pCfg->nodeInfo[i].nodePort = ports[i];
|
||||
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
|
||||
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||
taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
|
||||
}
|
||||
|
||||
SSyncNode *pSyncNode = syncNodeOpen(&syncInfo);
|
||||
|
|
Loading…
Reference in New Issue