From 73fa7a7fb79e0369bb58a66c4c35ee4de8f0dcac Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Oct 2021 10:46:53 +0800 Subject: [PATCH 1/5] vnode process mq msg (#8287) add vnodeprocess msg function --- include/common/taosmsg.h | 38 ++++++++++++++++++++++++- include/server/vnode/tq/tq.h | 5 ++-- source/server/dnode/src/dnodeTrans.c | 11 +++++-- source/server/vnode/inc/vnodeReadMsg.h | 2 ++ source/server/vnode/inc/vnodeWriteMsg.h | 8 +++++- source/server/vnode/src/vnodeMain.c | 36 ++++++++++++++--------- source/server/vnode/src/vnodeRead.c | 3 ++ source/server/vnode/src/vnodeWrite.c | 18 ++++++++++-- 8 files changed, 99 insertions(+), 22 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 78f91cca64..66a02f350e 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -42,7 +42,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) @@ -121,7 +123,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) #ifndef TAOS_MESSAGE_C - TSDB_MSG_TYPE_MAX // 105 + TSDB_MSG_TYPE_MAX // 147 #endif }; @@ -958,6 +960,40 @@ typedef struct { char reserved2[64]; } SStartupStep; +// mq related +typedef struct { + +} SMqConnectReq; + +typedef struct { + +} SMqConnectRsp; + +typedef struct { + +} SMqDisconnectReq; + +typedef struct { + +} SMqDisconnectRsp; + +typedef struct { + +} SMqAckReq; + +typedef struct { + +} SMqAckRsp; + +typedef struct { + +} SMqResetReq; + +typedef struct { + +} SMqResetRsp; +//mq related end + typedef struct { /* data */ } SSubmitReq; diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index ef6a34ffa3..6e56e8256f 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -26,8 +26,9 @@ typedef struct tmqMsgHead { int32_t headLen; int32_t msgVer; int64_t cgId; - int32_t topicLen; - char topic[]; + int64_t topicId; + int32_t checksum; + int32_t msgType; } tmqMsgHead; //TODO: put msgs into common diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index a4409674f1..1739283f34 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -97,6 +97,9 @@ int32_t dnodeInitServer() { tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + /*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/ + SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = tsDnodeDnodePort; @@ -308,10 +311,12 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c } int32_t dnodeInitShell() { - tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; // the following message shall be treated as mnode write tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; diff --git a/source/server/vnode/inc/vnodeReadMsg.h b/source/server/vnode/inc/vnodeReadMsg.h index a1efb729e1..1efc74d1af 100644 --- a/source/server/vnode/inc/vnodeReadMsg.h +++ b/source/server/vnode/inc/vnodeReadMsg.h @@ -36,6 +36,8 @@ typedef struct SReadMsg { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead); +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead); +int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead); #ifdef __cplusplus } diff --git a/source/server/vnode/inc/vnodeWriteMsg.h b/source/server/vnode/inc/vnodeWriteMsg.h index 86cdba6946..9dbc4fe490 100644 --- a/source/server/vnode/inc/vnodeWriteMsg.h +++ b/source/server/vnode/inc/vnodeWriteMsg.h @@ -27,9 +27,15 @@ int32_t vnodeProcessDropTableReq(SVnode *pVnode, SDropTableReq *pReq, SDropTable int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp); int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp); int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp); +//mq related +int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp); +int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp); +int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp); +int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp); +//mq related end #ifdef __cplusplus } #endif -#endif /*_TD_VNODE_WRITE_MSG_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_WRITE_MSG_H_*/ diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index d9c1a88d15..da1c1d7235 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -780,20 +780,30 @@ static void vnodeCleanupVnodes() { } static void vnodeInitMsgFp() { - tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; + //mq related + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; + //mq related end + tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; + //mq related + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; + //mq related end } void vnodeProcessMsg(SRpcMsg *pMsg) { diff --git a/source/server/vnode/src/vnodeRead.c b/source/server/vnode/src/vnodeRead.c index 39b6983b7d..0bf907c419 100644 --- a/source/server/vnode/src/vnodeRead.c +++ b/source/server/vnode/src/vnodeRead.c @@ -141,6 +141,9 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) { static void vnodeInitReadMsgFp() { tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; + + tsVread.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessTqQueryMsg; + tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg; } static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) { diff --git a/source/server/vnode/src/vnodeWrite.c b/source/server/vnode/src/vnodeWrite.c index 3c2634a2cf..c103460241 100644 --- a/source/server/vnode/src/vnodeWrite.c +++ b/source/server/vnode/src/vnodeWrite.c @@ -179,6 +179,20 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t case TSDB_MSG_TYPE_UPDATE_TAG_VAL: pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL); break; + //mq related + case TSDB_MSG_TYPE_MQ_CONNECT: + pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_DISCONNECT: + pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_ACK: + pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_RESET: + pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL); + break; + //mq related end default: pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED; break; @@ -186,7 +200,7 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t if (pWrite->code < 0) return false; - // update fync + // update fsync return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT); } @@ -233,4 +247,4 @@ void vnodeCleanupWrite() { taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); } -void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } \ No newline at end of file +void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } From 8be1a251df6c6d4eb979296d96936c72ebe33f5e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Oct 2021 10:51:35 +0800 Subject: [PATCH 2/5] handle compile error for tq (#8288) add vnodeprocess msg for tq --- source/server/vnode/src/vnodeReadMsg.c | 9 +++++++++ source/server/vnode/src/vnodeWriteMsg.c | 15 +++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 8a0f4b2e0f..158e550dcf 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -217,6 +217,15 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { return 0; } +//mq related +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){ + return 0; +} +int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) { + return 0; +} +//mq related end + int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { #if 0 void * pCont = pRead->pCont; diff --git a/source/server/vnode/src/vnodeWriteMsg.c b/source/server/vnode/src/vnodeWriteMsg.c index 0fe6fa2bc9..2e13d0035d 100644 --- a/source/server/vnode/src/vnodeWriteMsg.c +++ b/source/server/vnode/src/vnodeWriteMsg.c @@ -77,3 +77,18 @@ int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpd // TODO return 0; } + +//mq related +int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp){ + return 0; +} +int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp) { + return 0; +} +int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp) { + return 0; +} +int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp) { + return 0; +} +//mq related end From f0b31fed0f40da2ed62d563a3221c94492b3c89d Mon Sep 17 00:00:00 2001 From: lichuang Date: Sat, 23 Oct 2021 22:29:00 +0800 Subject: [PATCH 3/5] [feeature][raft]add raft interface --- include/libs/raft/raft_sync.h | 131 ++++++++++++++++++++++++++++++++ source/libs/raft/CMakeLists.txt | 8 ++ source/libs/raft/src/raft.c | 4 +- 3 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 include/libs/raft/raft_sync.h diff --git a/include/libs/raft/raft_sync.h b/include/libs/raft/raft_sync.h new file mode 100644 index 0000000000..d59ab5206a --- /dev/null +++ b/include/libs/raft/raft_sync.h @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_RAFT_SYNC_H +#define TDENGINE_RAFT_SYNC_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include "taosdef.h" +#include "wal.h" + +typedef uint32_t SyncNodeId; +typedef int64_t SyncVersion; +typedef uint64_t SSyncTerm; + +typedef enum { + TAOS_SYNC_ROLE_FOLLOWER = 0, + TAOS_SYNC_ROLE_CANDIDATE = 1, + TAOS_SYNC_ROLE_LEADER = 2, +} ESyncRole; + +typedef struct { + void* data; + size_t len; +} SSyncBuffer; + +typedef struct { + SyncNodeId nodeId; // node ID assigned by TDengine + uint16_t nodePort; // node sync Port + char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN +} SNodeInfo; + +typedef struct { + int selfIndex; + int nNode; + SNodeInfo* nodeInfo; +} SSyncCluster; + +typedef struct { + int32_t selfIndex; + int nNode; + SyncNodeId* nodeId; + ESyncRole* role; +} SNodesRole; + +struct SSyncFSM; +typedef struct SSyncFSM { + void* pData; + + // apply committed log, bufs will be free by raft module + int (*applyLog)(struct SSyncFSM *fsm, SyncVersion index, const SSyncBuffer *buf, void *pData); + + // cluster commit callback + int (*onClusterChanged)(struct SSyncFSM *fsm, const SSyncCluster* cluster, void *pData); + + // fsm return snapshot in ppBuf, bufs will be free by raft module + // TODO: getSnapshot SHOULD be async? + int (*getSnapshot)(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int* objId, bool *isLast); + + // fsm apply snapshot with pBuf data + int (*applySnapshot)(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int objId, bool isLast); + + // call when restore snapshot and log done + int (*onRestoreDone)(struct SSyncFSM *fsm); + + void (*onRollback)(struct SSyncFSM *fsm, SyncVersion index, const SSyncBuffer *buf); + + void (*onRoleChanged)(struct SSyncFSM *fsm, const SNodesRole* pRole); + +} SSyncFSM; + +typedef struct SSyncServerState { + SyncNodeId voteFor; + SSyncTerm term; +} SSyncServerState; + +typedef struct SStateManager { + void* pData; + + void (*saveServerState)(struct SStateManager* stateMng, const SSyncServerState* state); + + const SSyncServerState* (*readServerState)(struct SStateManager* stateMng); + + void (*saveCluster)(struct SStateManager* stateMng, const SSyncCluster* cluster); + + const SSyncCluster* (*readCluster)(struct SStateManager* stateMng); +} SStateManager; + +typedef struct { + int32_t vgId; // vgroup ID + + twalh walHandle; + + SyncVersion snapIndex; // initial version + SSyncCluster syncCfg; // configuration from mgmt + + SSyncFSM fsm; + + SStateManager stateManager; +} SSyncInfo; + +int32_t syncInit(); +void syncCleanUp(); + +SyncNodeId syncStart(const SSyncInfo *); +void syncStop(SyncNodeId); + +int32_t syncPropose(SyncNodeId nodeId, SSyncBuffer buffer, void *pData, bool isWeak); + +extern int32_t raftDebugFlag; + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_RAFT_SYNC_H diff --git a/source/libs/raft/CMakeLists.txt b/source/libs/raft/CMakeLists.txt index 127adc106e..6cdc72dc8f 100644 --- a/source/libs/raft/CMakeLists.txt +++ b/source/libs/raft/CMakeLists.txt @@ -1,5 +1,13 @@ aux_source_directory(src RAFT_SRC) add_library(raft ${RAFT_SRC}) + +target_link_libraries( + raft + PUBLIC common + PUBLIC util + PUBLIC wal +) + target_include_directories( raft PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/raft" diff --git a/source/libs/raft/src/raft.c b/source/libs/raft/src/raft.c index 6dea4a4e57..852618186f 100644 --- a/source/libs/raft/src/raft.c +++ b/source/libs/raft/src/raft.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "raft_sync.h" \ No newline at end of file From 048329e95e0971a2d92327531cafb2ef83dfbace Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 25 Oct 2021 09:29:37 +0800 Subject: [PATCH 4/5] [feeature][raft]refactor raft interface to sync --- include/libs/raft/raft.h | 161 ------------------ .../libs/{raft/raft_sync.h => sync/sync.h} | 11 +- source/libs/CMakeLists.txt | 2 +- source/libs/{raft => sync}/CMakeLists.txt | 10 +- source/libs/{raft => sync}/inc/raftInt.h | 0 .../libs/{raft/src/raft.c => sync/src/sync.c} | 2 +- source/libs/{raft => sync}/test/raftTests.cpp | 0 7 files changed, 13 insertions(+), 173 deletions(-) delete mode 100644 include/libs/raft/raft.h rename include/libs/{raft/raft_sync.h => sync/sync.h} (90%) rename source/libs/{raft => sync}/CMakeLists.txt (52%) rename source/libs/{raft => sync}/inc/raftInt.h (100%) rename source/libs/{raft/src/raft.c => sync/src/sync.c} (96%) rename source/libs/{raft => sync}/test/raftTests.cpp (100%) diff --git a/include/libs/raft/raft.h b/include/libs/raft/raft.h deleted file mode 100644 index 5b7f93276b..0000000000 --- a/include/libs/raft/raft.h +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TD_RAFT_H -#define TD_RAFT_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include "taosdef.h" - -typedef unsigned int RaftId; -typedef unsigned int RaftGroupId; - -// buffer holding data -typedef struct RaftBuffer { - void* data; - size_t len; -} RaftBuffer; - -// a single server information in a cluster -typedef struct RaftServer { - RaftId id; - char fqdn[TSDB_FQDN_LEN]; - uint16_t port; -} RaftServer; - -// all servers in a cluster -typedef struct RaftConfiguration { - RaftServer *servers; - int nServer; -} RaftConfiguration; - -// raft lib module -struct Raft; -typedef struct Raft Raft; - -struct RaftNode; -typedef struct RaftNode RaftNode; - -// raft state machine -struct RaftFSM; -typedef struct RaftFSM { - // statemachine user data - void *data; - - // apply buffer data, bufs will be free by raft module - int (*apply)(struct RaftFSM *fsm, const RaftBuffer *bufs[], int nBufs); - - // configuration commit callback - int (*onConfigurationCommit)(const RaftConfiguration* cluster); - - // fsm return snapshot in ppBuf, bufs will be free by raft module - // TODO: getSnapshot SHOULD be async? - int (*getSnapshot)(struct RaftFSM *fsm, RaftBuffer **ppBuf); - - // fsm restore with pBuf data - int (*restore)(struct RaftFSM *fsm, RaftBuffer *pBuf); - - // fsm send data in buf to server,buf will be free by raft module - int (*send)(struct RaftFSM* fsm, const RaftServer* server, const RaftBuffer *buf); -} RaftFSM; - -typedef struct RaftNodeOptions { - // user define state machine - RaftFSM* pFSM; - - // election timeout(in ms) - // by default: 1000 - int electionTimeoutMS; - - // heart timeout(in ms) - // by default: 100 - int heartbeatTimeoutMS; - - // install snapshot timeout(in ms) - int installSnapshotTimeoutMS; - - /** - * number of log entries before starting a new snapshot. - * by default: 1024 - */ - int snapshotThreshold; - - /** - * Number of log entries to keep in the log after a snapshot has - * been taken. - * by default: 128. - */ - int snapshotTrailing; - - /** - * Enable or disable pre-vote support. - * by default: false - */ - bool preVote; - -} RaftNodeOptions; - -// create raft lib -int RaftCreate(Raft** ppRaft); - -int RaftDestroy(Raft* pRaft); - -// start a raft node with options,node id,group id -int RaftStart(Raft* pRaft, - RaftId selfId, - RaftGroupId selfGroupId, - const RaftConfiguration* cluster, - const RaftNodeOptions* options, - RaftNode **ppNode); - -// stop a raft node -int RaftStop(RaftNode* pNode); - -// client apply a cmd in buf -typedef void (*RaftApplyFp)(const RaftBuffer *pBuf, int result); - -int RaftApply(RaftNode *pNode, - const RaftBuffer *pBuf, - RaftApplyFp applyCb); - -// recv data from other servers in cluster,buf will be free in raft -int RaftRecv(RaftNode *pNode, const RaftBuffer* pBuf); - -// change cluster servers API -typedef void (*RaftChangeFp)(const RaftServer* pServer, int result); - -int RaftAddServer(RaftNode *pNode, - const RaftServer* pServer, - RaftChangeFp changeCb); - -int RaftRemoveServer(RaftNode *pNode, - const RaftServer* pServer, - RaftChangeFp changeCb); - -// transfer leader to id -typedef void (*RaftTransferFp)(RaftId id, int result); -int RaftTransfer(RaftNode *pNode, - RaftId id, - RaftTransferFp transferCb); - -#ifdef __cplusplus -} -#endif - -#endif /* TD_RAFT_H */ \ No newline at end of file diff --git a/include/libs/raft/raft_sync.h b/include/libs/sync/sync.h similarity index 90% rename from include/libs/raft/raft_sync.h rename to include/libs/sync/sync.h index d59ab5206a..ee3ea8db9b 100644 --- a/include/libs/raft/raft_sync.h +++ b/include/libs/sync/sync.h @@ -25,7 +25,8 @@ extern "C" { #include "wal.h" typedef uint32_t SyncNodeId; -typedef int64_t SyncVersion; +typedef int32_t SyncGroupId; +typedef int64_t SyncIndex; typedef uint64_t SSyncTerm; typedef enum { @@ -63,7 +64,7 @@ typedef struct SSyncFSM { void* pData; // apply committed log, bufs will be free by raft module - int (*applyLog)(struct SSyncFSM *fsm, SyncVersion index, const SSyncBuffer *buf, void *pData); + int (*applyLog)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData); // cluster commit callback int (*onClusterChanged)(struct SSyncFSM *fsm, const SSyncCluster* cluster, void *pData); @@ -78,7 +79,7 @@ typedef struct SSyncFSM { // call when restore snapshot and log done int (*onRestoreDone)(struct SSyncFSM *fsm); - void (*onRollback)(struct SSyncFSM *fsm, SyncVersion index, const SSyncBuffer *buf); + void (*onRollback)(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf); void (*onRoleChanged)(struct SSyncFSM *fsm, const SNodesRole* pRole); @@ -102,11 +103,11 @@ typedef struct SStateManager { } SStateManager; typedef struct { - int32_t vgId; // vgroup ID + SyncGroupId vgId; twalh walHandle; - SyncVersion snapIndex; // initial version + SyncIndex snapshotIndex; // initial version SSyncCluster syncCfg; // configuration from mgmt SSyncFSM fsm; diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index a7ec318eaa..3a975e679b 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -1,5 +1,5 @@ add_subdirectory(transport) -add_subdirectory(raft) +add_subdirectory(sync) add_subdirectory(tkv) add_subdirectory(index) add_subdirectory(wal) diff --git a/source/libs/raft/CMakeLists.txt b/source/libs/sync/CMakeLists.txt similarity index 52% rename from source/libs/raft/CMakeLists.txt rename to source/libs/sync/CMakeLists.txt index 6cdc72dc8f..124f4a1fee 100644 --- a/source/libs/raft/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -1,15 +1,15 @@ -aux_source_directory(src RAFT_SRC) -add_library(raft ${RAFT_SRC}) +aux_source_directory(src SYNC_SRC) +add_library(sync ${SYNC_SRC}) target_link_libraries( - raft + sync PUBLIC common PUBLIC util PUBLIC wal ) target_include_directories( - raft - PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/raft" + sync + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) \ No newline at end of file diff --git a/source/libs/raft/inc/raftInt.h b/source/libs/sync/inc/raftInt.h similarity index 100% rename from source/libs/raft/inc/raftInt.h rename to source/libs/sync/inc/raftInt.h diff --git a/source/libs/raft/src/raft.c b/source/libs/sync/src/sync.c similarity index 96% rename from source/libs/raft/src/raft.c rename to source/libs/sync/src/sync.c index 852618186f..35611f3da8 100644 --- a/source/libs/raft/src/raft.c +++ b/source/libs/sync/src/sync.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "raft_sync.h" \ No newline at end of file +#include "sync.h" \ No newline at end of file diff --git a/source/libs/raft/test/raftTests.cpp b/source/libs/sync/test/raftTests.cpp similarity index 100% rename from source/libs/raft/test/raftTests.cpp rename to source/libs/sync/test/raftTests.cpp From 497735f677c57a63c5d77f0d6936470cedf88302 Mon Sep 17 00:00:00 2001 From: liuyq-617 Date: Mon, 25 Oct 2021 10:30:31 +0800 Subject: [PATCH 5/5] [TD-10750]remove default checkout in jenkins --- Jenkinsfile | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 88e0cd4cf1..c93350f2f6 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,11 +1,11 @@ import hudson.model.Result +import hudson.model.*; import jenkins.model.CauseOfInterruption -properties([pipelineTriggers([githubPush()])]) node { - git url: 'https://github.com/taosdata/TDengine.git' } def skipbuild=0 +def win_stop=0 def abortPreviousBuilds() { def currentJobName = env.JOB_NAME @@ -121,6 +121,7 @@ def pre_test(){ pipeline { agent none + options { skipDefaultCheckout() } environment{ WK = '/var/lib/jenkins/workspace/TDinternal' WKC= '/var/lib/jenkins/workspace/TDinternal/community' @@ -128,6 +129,7 @@ pipeline { stages { stage('pre_build'){ agent{label 'master'} + options { skipDefaultCheckout() } when { changeRequest() } @@ -181,6 +183,7 @@ pipeline { } stage('Parallel test stage') { //only build pr + options { skipDefaultCheckout() } when { allOf{ changeRequest()