From d5510988c27dc2a37dd3a039db1f5e1b6bfcd644 Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 26 Oct 2021 10:06:37 +0800 Subject: [PATCH 1/2] [feeature][raft]refactor sync interface --- include/libs/sync/sync.h | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ee3ea8db9b..4a932d403e 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,7 +24,7 @@ extern "C" { #include "taosdef.h" #include "wal.h" -typedef uint32_t SyncNodeId; +typedef int64_t SyncNodeId; typedef int32_t SyncGroupId; typedef int64_t SyncIndex; typedef uint64_t SSyncTerm; @@ -41,7 +41,6 @@ typedef struct { } SSyncBuffer; typedef struct { - SyncNodeId nodeId; // node ID assigned by TDengine uint16_t nodePort; // node sync Port char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN } SNodeInfo; @@ -55,7 +54,7 @@ typedef struct { typedef struct { int32_t selfIndex; int nNode; - SyncNodeId* nodeId; + SNodeInfo* node; ESyncRole* role; } SNodesRole; @@ -86,7 +85,7 @@ typedef struct SSyncFSM { } SSyncFSM; typedef struct SSyncServerState { - SyncNodeId voteFor; + SNodeInfo voteFor; SSyncTerm term; } SSyncServerState; @@ -107,8 +106,8 @@ typedef struct { twalh walHandle; - SyncIndex snapshotIndex; // initial version - SSyncCluster syncCfg; // configuration from mgmt + SyncIndex snapshotIndex; + SSyncCluster syncCfg; SSyncFSM fsm; @@ -123,7 +122,11 @@ void syncStop(SyncNodeId); int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void *pData, bool isWeak); -extern int32_t raftDebugFlag; +int32_t syncAddNode(SyncNodeId nodeId, const SNodeInfo *pNode); + +int32_t syncRemoveNode(SyncNodeId nodeId, const SNodeInfo *pNode); + +extern int32_t syncDebugFlag; #ifdef __cplusplus } From cdfa8fc31c27ef25bba487253d0c9ebbbf45704d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 26 Oct 2021 10:09:07 +0800 Subject: [PATCH 2/2] solve 3.0 conflict (#8428) add tq data structure --- source/server/dnode/src/dnodeTrans.c | 17 +++++++++++------ source/server/vnode/src/vnodeMain.c | 6 ++---- source/server/vnode/src/vnodeReadMsg.c | 1 + 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 6d734932ff..6f4cb2d6eb 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -21,11 +21,11 @@ #define _DEFAULT_SOURCE #include "dnodeTrans.h" -#include "dnodeEps.h" -#include "dnodeMsg.h" +#include "dnodeMain.h" +#include "dnodeMnodeEps.h" +#include "dnodeStatus.h" #include "mnode.h" #include "vnode.h" -#include "mnode.h" typedef void (*RpcMsgFp)(SRpcMsg *pMsg); @@ -97,8 +97,12 @@ static int32_t dnodeInitServer() { tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -139,11 +143,12 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { - dnodeUpdateMnodeEps(pEpSet); + dnodeUpdateMnodeFromPeer(pEpSet); } RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; if (fp != NULL) { + dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]); (*fp)(pMsg); } else { dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index da1c1d7235..f1834ffdbe 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -797,13 +797,11 @@ static void vnodeInitMsgFp() { tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; - //mq related end - tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; - //mq related tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; //mq related end + tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; } void vnodeProcessMsg(SRpcMsg *pMsg) { diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index b4070546c7..d4701ec4ce 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -236,6 +236,7 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { //fetch or register context tqFetchMsg(pHandle, pRead); //judge mode, tail read or catch up read + /*int64_t lastVer = walLastVer(pVnode->wal);*/ //launch new query return 0; }