From 0f9cd43027e3e0d7a433abfbafe303117c3d0302 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 20 May 2022 10:23:48 +0800 Subject: [PATCH 1/3] refactor: adjust sync header file --- include/libs/sync/sync.h | 68 +++++---------- include/libs/sync/syncTools.h | 6 -- source/libs/sync/inc/syncInt.h | 150 ++++++--------------------------- 3 files changed, 49 insertions(+), 175 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 831063c606..9b6593e4b5 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -20,17 +20,23 @@ extern "C" { #endif -#include "os.h" - #include "cJSON.h" #include "tdef.h" #include "tmsgcb.h" +#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_INVALID -1 + typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; typedef int64_t SyncIndex; typedef uint64_t SyncTerm; +typedef struct SSyncNode SSyncNode; +typedef struct SSyncBuffer SSyncBuffer; +typedef struct SWal SWal; +typedef struct SSyncRaftEntry SSyncRaftEntry; + typedef enum { TAOS_SYNC_STATE_FOLLOWER = 100, TAOS_SYNC_STATE_CANDIDATE = 101, @@ -38,6 +44,17 @@ typedef enum { TAOS_SYNC_STATE_ERROR = 103, } ESyncState; +typedef enum { + TAOS_SYNC_PROPOSE_SUCCESS = 0, + TAOS_SYNC_PROPOSE_NOT_LEADER = 1, + TAOS_SYNC_PROPOSE_OTHER_ERROR = 2, +} ESyncProposeCode; + +typedef enum { + TAOS_SYNC_FSM_CB_SUCCESS = 0, + TAOS_SYNC_FSM_CB_OTHER_ERROR = 1, +} ESyncFsmCbCode; + typedef struct SNodeInfo { uint16_t nodePort; char nodeFqdn[TSDB_FQDN_LEN]; @@ -55,11 +72,6 @@ typedef struct SSnapshot { SyncTerm lastApplyTerm; } SSnapshot; -typedef enum { - TAOS_SYNC_FSM_CB_SUCCESS = 0, - TAOS_SYNC_FSM_CB_OTHER_ERROR, -} ESyncFsmCbCode; - typedef struct SFsmCbMeta { SyncIndex index; bool isWeak; @@ -68,27 +80,15 @@ typedef struct SFsmCbMeta { uint64_t seqNum; } SFsmCbMeta; -struct SRpcMsg; -typedef struct SRpcMsg SRpcMsg; - typedef struct SSyncFSM { void* data; - void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); - } SSyncFSM; -struct SSyncRaftEntry; -typedef struct SSyncRaftEntry SSyncRaftEntry; - -#define SYNC_INDEX_BEGIN 0 -#define SYNC_INDEX_INVALID -1 - // abstract definition of log store in raft // SWal implements it typedef struct SSyncLogStore { @@ -117,11 +117,6 @@ typedef struct SSyncLogStore { } SSyncLogStore; -struct SWal; -typedef struct SWal SWal; - -struct SEpSet; -typedef struct SEpSet SEpSet; typedef struct SSyncInfo { SyncGroupId vgId; @@ -130,10 +125,8 @@ typedef struct SSyncInfo { SWal* pWal; SSyncFSM* pFsm; SMsgCb* msgcb; - int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); - } SSyncInfo; int32_t syncInit(); @@ -148,27 +141,8 @@ const char* syncGetMyRoleStr(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncGetVgId(int64_t rid); - -typedef enum { - TAOS_SYNC_PROPOSE_SUCCESS = 0, - TAOS_SYNC_PROPOSE_NOT_LEADER, - TAOS_SYNC_PROPOSE_OTHER_ERROR, -} ESyncProposeCode; - -int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); - -bool syncEnvIsStart(); - -extern int32_t sDebugFlag; - -//----------------------------------------- -struct SSyncNode; -typedef struct SSyncNode SSyncNode; - -struct SSyncBuffer; -typedef struct SSyncBuffer SSyncBuffer; -//----------------------------------------- - +int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); +bool syncEnvIsStart(); const char* syncStr(ESyncState state); #ifdef __cplusplus diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 01c25b93cc..4b160c9e61 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -20,9 +20,6 @@ extern "C" { #endif -#include "os.h" - -#include "cJSON.h" #include "trpc.h" // ------------------ ds ------------------- @@ -32,9 +29,6 @@ typedef struct SRaftId { } SRaftId; // ------------------ control ------------------- -struct SSyncNode; -typedef struct SSyncNode SSyncNode; - SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 36f22db05f..768e1c1cf1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -20,135 +20,41 @@ extern "C" { #endif -#include -#include -#include -#include "cJSON.h" #include "sync.h" #include "syncTools.h" -#include "taosdef.h" -#include "tglobal.h" #include "tlog.h" #include "ttimer.h" -#define sFatal(...) \ - { \ - if (sDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ - } \ - } -#define sError(...) \ - { \ - if (sDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ - } \ - } -#define sWarn(...) \ - { \ - if (sDebugFlag & DEBUG_WARN) { \ - taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ - } \ - } -#define sInfo(...) \ - { \ - if (sDebugFlag & DEBUG_INFO) { \ - taosPrintLog("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \ - } \ - } -#define sDebug(...) \ - { \ - if (sDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \ - } \ - } -#define sTrace(...) \ - { \ - if (sDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \ - } \ - } +// clang-format off +#define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define sError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define sWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define sInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define sDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0) +#define sTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0) +#define sFatalLong(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define sErrorLong(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define sWarnLong(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define sInfoLong(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define sDebugLong(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0) +#define sTraceLong(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0) +// clang-format on -#define sFatalLong(...) \ - { \ - if (sDebugFlag & DEBUG_FATAL) { \ - taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ - } \ - } -#define sErrorLong(...) \ - { \ - if (sDebugFlag & DEBUG_ERROR) { \ - taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ - } \ - } -#define sWarnLong(...) \ - { \ - if (sDebugFlag & DEBUG_WARN) { \ - taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ - } \ - } -#define sInfoLong(...) \ - { \ - if (sDebugFlag & DEBUG_INFO) { \ - taosPrintLongString("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \ - } \ - } -#define sDebugLong(...) \ - { \ - if (sDebugFlag & DEBUG_DEBUG) { \ - taosPrintLongString("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \ - } \ - } -#define sTraceLong(...) \ - { \ - if (sDebugFlag & DEBUG_TRACE) { \ - taosPrintLongString("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \ - } \ - } - -struct SyncTimeout; -typedef struct SyncTimeout SyncTimeout; - -struct SyncClientRequest; -typedef struct SyncClientRequest SyncClientRequest; - -struct SyncPing; -typedef struct SyncPing SyncPing; - -struct SyncPingReply; -typedef struct SyncPingReply SyncPingReply; - -struct SyncRequestVote; -typedef struct SyncRequestVote SyncRequestVote; - -struct SyncRequestVoteReply; -typedef struct SyncRequestVoteReply SyncRequestVoteReply; - -struct SyncAppendEntries; -typedef struct SyncAppendEntries SyncAppendEntries; - -struct SyncAppendEntriesReply; +typedef struct SyncTimeout SyncTimeout; +typedef struct SyncClientRequest SyncClientRequest; +typedef struct SyncPing SyncPing; +typedef struct SyncPingReply SyncPingReply; +typedef struct SyncRequestVote SyncRequestVote; +typedef struct SyncRequestVoteReply SyncRequestVoteReply; +typedef struct SyncAppendEntries SyncAppendEntries; typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; - -struct SSyncEnv; -typedef struct SSyncEnv SSyncEnv; - -struct SRaftStore; -typedef struct SRaftStore SRaftStore; - -struct SVotesGranted; -typedef struct SVotesGranted SVotesGranted; - -struct SVotesRespond; -typedef struct SVotesRespond SVotesRespond; - -struct SSyncIndexMgr; -typedef struct SSyncIndexMgr SSyncIndexMgr; - -struct SRaftCfg; -typedef struct SRaftCfg SRaftCfg; - -struct SSyncRespMgr; -typedef struct SSyncRespMgr SSyncRespMgr; +typedef struct SSyncEnv SSyncEnv; +typedef struct SRaftStore SRaftStore; +typedef struct SVotesGranted SVotesGranted; +typedef struct SVotesRespond SVotesRespond; +typedef struct SSyncIndexMgr SSyncIndexMgr; +typedef struct SRaftCfg SRaftCfg; +typedef struct SSyncRespMgr SSyncRespMgr; typedef struct SSyncNode { // init by SSyncInfo From f2a84eda09e315ee8f8c7bc2f2500d0f649d6bd4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 20 May 2022 10:30:48 +0800 Subject: [PATCH 2/3] refactor: adjust vnode header file --- source/dnode/vnode/inc/vnode.h | 34 ++++++------ source/dnode/vnode/src/inc/vnd.h | 59 +++++++++------------ source/libs/sync/src/syncMain.c | 4 +- source/libs/sync/test/syncIOSendMsgTest.cpp | 3 +- 4 files changed, 46 insertions(+), 54 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index b48a8775ce..68d4216bae 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg; extern const SVnodeCfg vnodeCfgDefault; -int vnodeInit(int nthreads); +int32_t vnodeInit(int32_t nthreads); void vnodeCleanup(); -int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); +int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); -int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); -int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); -int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); -int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); +int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); +int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); +int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -int vnodeValidateTableHash(SVnode *pVnode, char *tableFName); +int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); int32_t vnodeStart(SVnode *pVnode); void vnodeStop(SVnode *pVnode); @@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry; void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderClear(SMetaReader *pReader); -int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); -int metaReadNext(SMetaReader *pReader); +int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); +int32_t metaReadNext(SMetaReader *pReader); const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid); #if 1 // refact APIs below (TODO) @@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor; SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); -int metaTbCursorNext(SMTbCursor *pTbCur); +int32_t metaTbCursorNext(SMTbCursor *pTbCur); #endif // tsdb @@ -124,8 +124,8 @@ typedef struct STqReadHandle STqReadHandle; STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); -int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); -int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids); @@ -207,15 +207,15 @@ struct SMetaReader { SDecoder coder; SMetaEntry me; void *pBuf; - int szBuf; + int32_t szBuf; }; struct SMTbCursor { TBC *pDbc; void *pKey; void *pVal; - int kLen; - int vLen; + int32_t kLen; + int32_t vLen; SMetaReader mr; }; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index a034833a57..eb3382ac4c 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -24,7 +24,6 @@ extern "C" { #endif -// vnodeDebug ==================== // clang-format off #define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) @@ -34,17 +33,17 @@ extern "C" { #define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -// vnodeCfg ==================== +// vnodeCfg.c extern const SVnodeCfg vnodeCfgDefault; -int vnodeCheckCfg(const SVnodeCfg*); -int vnodeEncodeConfig(const void* pObj, SJson* pJson); -int vnodeDecodeConfig(const SJson* pJson, void* pObj); +int32_t vnodeCheckCfg(const SVnodeCfg*); +int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson); +int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj); -// vnodeModule ==================== -int vnodeScheduleTask(int (*execute)(void*), void* arg); +// vnodeModule.c +int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg); -// vnodeBufPool ==================== +// vnodeBufPool.c typedef struct SVBufPoolNode SVBufPoolNode; struct SVBufPoolNode { SVBufPoolNode* prev; @@ -62,37 +61,29 @@ struct SVBufPool { SVBufPoolNode node; }; -int vnodeOpenBufPool(SVnode* pVnode, int64_t size); -int vnodeCloseBufPool(SVnode* pVnode); -void vnodeBufPoolReset(SVBufPool* pPool); +int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size); +int32_t vnodeCloseBufPool(SVnode* pVnode); +void vnodeBufPoolReset(SVBufPool* pPool); -// vnodeQuery ==================== -int vnodeQueryOpen(SVnode* pVnode); -void vnodeQueryClose(SVnode* pVnode); -int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); +// vnodeQuery.c +int32_t vnodeQueryOpen(SVnode* pVnode); +void vnodeQueryClose(SVnode* pVnode); +int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); -// vnodeCommit ==================== -int vnodeBegin(SVnode* pVnode); -int vnodeShouldCommit(SVnode* pVnode); -int vnodeCommit(SVnode* pVnode); -int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); -int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); -int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); -int vnodeSyncCommit(SVnode* pVnode); -int vnodeAsyncCommit(SVnode* pVnode); +// vnodeCommit.c +int32_t vnodeBegin(SVnode* pVnode); +int32_t vnodeShouldCommit(SVnode* pVnode); +int32_t vnodeCommit(SVnode* pVnode); +int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); +int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); +int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); +int32_t vnodeSyncCommit(SVnode* pVnode); +int32_t vnodeAsyncCommit(SVnode* pVnode); -// vnodeCommit ==================== +// vnodeSync.c int32_t vnodeSyncOpen(SVnode* pVnode, char* path); -int32_t vnodeSyncStart(SVnode* pVnode); +void vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); -void vnodeSyncSetMsgCb(SVnode* pVnode); -int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg); -int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg); -void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); -void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); -void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); -int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); -SSyncFSM* syncVnodeMakeFsm(); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 56389de88a..d9ff60bbe2 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -674,10 +674,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); if (pSyncNode->FpSendMsg != NULL) { - pMsg->info.noResp = 1; // htonl syncUtilMsgHtoN(pMsg->pCont); + pMsg->info.noResp = 1; pSyncNode->FpSendMsg(&epSet, pMsg); } else { sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL"); @@ -689,10 +689,10 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S SEpSet epSet; syncUtilnodeInfo2EpSet(nodeInfo, &epSet); if (pSyncNode->FpSendMsg != NULL) { - pMsg->info.noResp = 1; // htonl syncUtilMsgHtoN(pMsg->pCont); + pMsg->info.noResp = 1; pSyncNode->FpSendMsg(&epSet, pMsg); } else { sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL"); diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index b8a9bec108..630d96054b 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -97,11 +97,12 @@ int main(int argc, char** argv) { for (int i = 0; i < 10; ++i) { SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest"); - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); SEpSet epSet; syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); + rpcMsg.info.noResp = 1; pSyncNode->FpSendMsg(&epSet, &rpcMsg); taosMsleep(1000); From ff91282bee95865907a8cd71f635c3a1f92fae9a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 20 May 2022 10:51:53 +0800 Subject: [PATCH 3/3] refactor: adjust vnode sync --- source/dnode/vnode/src/vnd/vnodeOpen.c | 1 - source/dnode/vnode/src/vnd/vnodeSync.c | 89 ++++++++++++-------------- 2 files changed, 40 insertions(+), 50 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index ef86ac86e4..f0af677641 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -180,7 +180,6 @@ void vnodeClose(SVnode *pVnode) { // start the sync timer after the queue is ready int32_t vnodeStart(SVnode *pVnode) { - vnodeSyncSetMsgCb(pVnode); vnodeSyncStart(pVnode); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index bcef95baff..8659c41807 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -13,71 +13,62 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "vnd.h" +static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg); +static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg); +static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode); +static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); +static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); +static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); +static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); + int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { - SSyncInfo syncInfo; - syncInfo.vgId = pVnode->config.vgId; - SSyncCfg *pCfg = &(syncInfo.syncCfg); - pCfg->replicaNum = pVnode->config.syncCfg.replicaNum; - pCfg->myIndex = pVnode->config.syncCfg.myIndex; - memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo)); + SSyncInfo syncInfo = { + .vgId = pVnode->config.vgId, + .syncCfg = pVnode->config.syncCfg, + .pWal = pVnode->pWal, + .msgcb = NULL, + .FpSendMsg = vnodeSyncSendMsg, + .FpEqMsg = vnodeSyncEqMsg, + }; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", path); - syncInfo.pWal = pVnode->pWal; - - syncInfo.pFsm = syncVnodeMakeFsm(pVnode); - syncInfo.msgcb = NULL; - syncInfo.FpSendMsg = vnodeSyncSendMsg; - syncInfo.FpEqMsg = vnodeSyncEqMsg; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); + syncInfo.pFsm = vnodeSyncMakeFsm(pVnode); pVnode->sync = syncOpen(&syncInfo); - assert(pVnode->sync > 0); + if (pVnode->sync <= 0) { + vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr()); + return -1; + } - // for test setPingTimerMS(pVnode->sync, 3000); setElectTimerMS(pVnode->sync, 500); setHeartbeatTimerMS(pVnode->sync, 100); - return 0; } -int32_t vnodeSyncStart(SVnode *pVnode) { +void vnodeSyncStart(SVnode *pVnode) { + syncSetMsgCb(pVnode->sync, &pVnode->msgCb); syncStart(pVnode->sync); - return 0; } -void vnodeSyncClose(SVnode *pVnode) { - // stop by ref id - syncStop(pVnode->sync); -} - -void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); } +void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } -int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { - pMsg->info.noResp = 1; - return tmsgSendReq(pEpSet, pMsg); -} - -int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { - SVnode *pVnode = (SVnode *)(pFsm->data); - vnodeGetSnapshot(pVnode, pSnapshot); - - /* - pSnapshot->data = NULL; - pSnapshot->lastApplyIndex = 0; - pSnapshot->lastApplyTerm = 0; - */ +int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } +int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { + vnodeGetSnapshot(pFsm->data, pSnapshot); return 0; } -void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { - SSnapshot snapshot; + SSnapshot snapshot = {0}; pFsm->FpGetSnapshot(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -128,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb } } -void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, @@ -136,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } -void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } -SSyncFSM *syncVnodeMakeFsm(SVnode *pVnode) { - SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); +SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { + SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pVnode; - pFsm->FpCommitCb = vnodeSyncCommitCb; - pFsm->FpPreCommitCb = vnodeSyncPreCommitCb; - pFsm->FpRollBackCb = vnodeSyncRollBackCb; - pFsm->FpGetSnapshot = vnodeSyncGetSnapshotCb; + pFsm->FpCommitCb = vnodeSyncCommitMsg; + pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; + pFsm->FpRollBackCb = vnodeSyncRollBackMsg; + pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; return pFsm; -} +} \ No newline at end of file