diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h
index 831063c606..9b6593e4b5 100644
--- a/include/libs/sync/sync.h
+++ b/include/libs/sync/sync.h
@@ -20,17 +20,23 @@
extern "C" {
#endif
-#include "os.h"
-
#include "cJSON.h"
#include "tdef.h"
#include "tmsgcb.h"
+#define SYNC_INDEX_BEGIN 0
+#define SYNC_INDEX_INVALID -1
+
typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId;
typedef int64_t SyncIndex;
typedef uint64_t SyncTerm;
+typedef struct SSyncNode SSyncNode;
+typedef struct SSyncBuffer SSyncBuffer;
+typedef struct SWal SWal;
+typedef struct SSyncRaftEntry SSyncRaftEntry;
+
typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 100,
TAOS_SYNC_STATE_CANDIDATE = 101,
@@ -38,6 +44,17 @@ typedef enum {
TAOS_SYNC_STATE_ERROR = 103,
} ESyncState;
+typedef enum {
+ TAOS_SYNC_PROPOSE_SUCCESS = 0,
+ TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
+ TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
+} ESyncProposeCode;
+
+typedef enum {
+ TAOS_SYNC_FSM_CB_SUCCESS = 0,
+ TAOS_SYNC_FSM_CB_OTHER_ERROR = 1,
+} ESyncFsmCbCode;
+
typedef struct SNodeInfo {
uint16_t nodePort;
char nodeFqdn[TSDB_FQDN_LEN];
@@ -55,11 +72,6 @@ typedef struct SSnapshot {
SyncTerm lastApplyTerm;
} SSnapshot;
-typedef enum {
- TAOS_SYNC_FSM_CB_SUCCESS = 0,
- TAOS_SYNC_FSM_CB_OTHER_ERROR,
-} ESyncFsmCbCode;
-
typedef struct SFsmCbMeta {
SyncIndex index;
bool isWeak;
@@ -68,27 +80,15 @@ typedef struct SFsmCbMeta {
uint64_t seqNum;
} SFsmCbMeta;
-struct SRpcMsg;
-typedef struct SRpcMsg SRpcMsg;
-
typedef struct SSyncFSM {
void* data;
-
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
-
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
-
} SSyncFSM;
-struct SSyncRaftEntry;
-typedef struct SSyncRaftEntry SSyncRaftEntry;
-
-#define SYNC_INDEX_BEGIN 0
-#define SYNC_INDEX_INVALID -1
-
// abstract definition of log store in raft
// SWal implements it
typedef struct SSyncLogStore {
@@ -117,11 +117,6 @@ typedef struct SSyncLogStore {
} SSyncLogStore;
-struct SWal;
-typedef struct SWal SWal;
-
-struct SEpSet;
-typedef struct SEpSet SEpSet;
typedef struct SSyncInfo {
SyncGroupId vgId;
@@ -130,10 +125,8 @@ typedef struct SSyncInfo {
SWal* pWal;
SSyncFSM* pFsm;
SMsgCb* msgcb;
-
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
-
} SSyncInfo;
int32_t syncInit();
@@ -148,27 +141,8 @@ const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncGetVgId(int64_t rid);
-
-typedef enum {
- TAOS_SYNC_PROPOSE_SUCCESS = 0,
- TAOS_SYNC_PROPOSE_NOT_LEADER,
- TAOS_SYNC_PROPOSE_OTHER_ERROR,
-} ESyncProposeCode;
-
-int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
-
-bool syncEnvIsStart();
-
-extern int32_t sDebugFlag;
-
-//-----------------------------------------
-struct SSyncNode;
-typedef struct SSyncNode SSyncNode;
-
-struct SSyncBuffer;
-typedef struct SSyncBuffer SSyncBuffer;
-//-----------------------------------------
-
+int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
+bool syncEnvIsStart();
const char* syncStr(ESyncState state);
#ifdef __cplusplus
diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h
index 01c25b93cc..4b160c9e61 100644
--- a/include/libs/sync/syncTools.h
+++ b/include/libs/sync/syncTools.h
@@ -20,9 +20,6 @@
extern "C" {
#endif
-#include "os.h"
-
-#include "cJSON.h"
#include "trpc.h"
// ------------------ ds -------------------
@@ -32,9 +29,6 @@ typedef struct SRaftId {
} SRaftId;
// ------------------ control -------------------
-struct SSyncNode;
-typedef struct SSyncNode SSyncNode;
-
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h
index b48a8775ce..68d4216bae 100644
--- a/source/dnode/vnode/inc/vnode.h
+++ b/source/dnode/vnode/inc/vnode.h
@@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg;
extern const SVnodeCfg vnodeCfgDefault;
-int vnodeInit(int nthreads);
+int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup();
-int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
+int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode);
-int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
-int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
-int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
-int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
-int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
-int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
+int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
+int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
+int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
+int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
+int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
+int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
-int vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
+int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode);
@@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry;
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderClear(SMetaReader *pReader);
-int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
-int metaReadNext(SMetaReader *pReader);
+int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
+int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
#if 1 // refact APIs below (TODO)
@@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
-int metaTbCursorNext(SMTbCursor *pTbCur);
+int32_t metaTbCursorNext(SMTbCursor *pTbCur);
#endif
// tsdb
@@ -124,8 +124,8 @@ typedef struct STqReadHandle STqReadHandle;
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
-int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
-int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
+int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
+int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
@@ -207,15 +207,15 @@ struct SMetaReader {
SDecoder coder;
SMetaEntry me;
void *pBuf;
- int szBuf;
+ int32_t szBuf;
};
struct SMTbCursor {
TBC *pDbc;
void *pKey;
void *pVal;
- int kLen;
- int vLen;
+ int32_t kLen;
+ int32_t vLen;
SMetaReader mr;
};
diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h
index a034833a57..eb3382ac4c 100644
--- a/source/dnode/vnode/src/inc/vnd.h
+++ b/source/dnode/vnode/src/inc/vnd.h
@@ -24,7 +24,6 @@
extern "C" {
#endif
-// vnodeDebug ====================
// clang-format off
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
@@ -34,17 +33,17 @@ extern "C" {
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
-// vnodeCfg ====================
+// vnodeCfg.c
extern const SVnodeCfg vnodeCfgDefault;
-int vnodeCheckCfg(const SVnodeCfg*);
-int vnodeEncodeConfig(const void* pObj, SJson* pJson);
-int vnodeDecodeConfig(const SJson* pJson, void* pObj);
+int32_t vnodeCheckCfg(const SVnodeCfg*);
+int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
+int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
-// vnodeModule ====================
-int vnodeScheduleTask(int (*execute)(void*), void* arg);
+// vnodeModule.c
+int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg);
-// vnodeBufPool ====================
+// vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode;
struct SVBufPoolNode {
SVBufPoolNode* prev;
@@ -62,37 +61,29 @@ struct SVBufPool {
SVBufPoolNode node;
};
-int vnodeOpenBufPool(SVnode* pVnode, int64_t size);
-int vnodeCloseBufPool(SVnode* pVnode);
-void vnodeBufPoolReset(SVBufPool* pPool);
+int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
+int32_t vnodeCloseBufPool(SVnode* pVnode);
+void vnodeBufPoolReset(SVBufPool* pPool);
-// vnodeQuery ====================
-int vnodeQueryOpen(SVnode* pVnode);
-void vnodeQueryClose(SVnode* pVnode);
-int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
+// vnodeQuery.c
+int32_t vnodeQueryOpen(SVnode* pVnode);
+void vnodeQueryClose(SVnode* pVnode);
+int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
-// vnodeCommit ====================
-int vnodeBegin(SVnode* pVnode);
-int vnodeShouldCommit(SVnode* pVnode);
-int vnodeCommit(SVnode* pVnode);
-int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
-int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
-int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
-int vnodeSyncCommit(SVnode* pVnode);
-int vnodeAsyncCommit(SVnode* pVnode);
+// vnodeCommit.c
+int32_t vnodeBegin(SVnode* pVnode);
+int32_t vnodeShouldCommit(SVnode* pVnode);
+int32_t vnodeCommit(SVnode* pVnode);
+int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
+int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
+int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
+int32_t vnodeSyncCommit(SVnode* pVnode);
+int32_t vnodeAsyncCommit(SVnode* pVnode);
-// vnodeCommit ====================
+// vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
-int32_t vnodeSyncStart(SVnode* pVnode);
+void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
-void vnodeSyncSetMsgCb(SVnode* pVnode);
-int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg);
-int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg);
-void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
-void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
-void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
-int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
-SSyncFSM* syncVnodeMakeFsm();
#ifdef __cplusplus
}
diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c
index ef86ac86e4..f0af677641 100644
--- a/source/dnode/vnode/src/vnd/vnodeOpen.c
+++ b/source/dnode/vnode/src/vnd/vnodeOpen.c
@@ -180,7 +180,6 @@ void vnodeClose(SVnode *pVnode) {
// start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) {
- vnodeSyncSetMsgCb(pVnode);
vnodeSyncStart(pVnode);
return 0;
}
diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c
index bcef95baff..8659c41807 100644
--- a/source/dnode/vnode/src/vnd/vnodeSync.c
+++ b/source/dnode/vnode/src/vnd/vnodeSync.c
@@ -13,71 +13,62 @@
* along with this program. If not, see .
*/
+#define _DEFAULT_SOURCE
#include "vnd.h"
+static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
+static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
+static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
+static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
+static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
+static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
+static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
+
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
- SSyncInfo syncInfo;
- syncInfo.vgId = pVnode->config.vgId;
- SSyncCfg *pCfg = &(syncInfo.syncCfg);
- pCfg->replicaNum = pVnode->config.syncCfg.replicaNum;
- pCfg->myIndex = pVnode->config.syncCfg.myIndex;
- memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo));
+ SSyncInfo syncInfo = {
+ .vgId = pVnode->config.vgId,
+ .syncCfg = pVnode->config.syncCfg,
+ .pWal = pVnode->pWal,
+ .msgcb = NULL,
+ .FpSendMsg = vnodeSyncSendMsg,
+ .FpEqMsg = vnodeSyncEqMsg,
+ };
- snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", path);
- syncInfo.pWal = pVnode->pWal;
-
- syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
- syncInfo.msgcb = NULL;
- syncInfo.FpSendMsg = vnodeSyncSendMsg;
- syncInfo.FpEqMsg = vnodeSyncEqMsg;
+ snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
+ syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
pVnode->sync = syncOpen(&syncInfo);
- assert(pVnode->sync > 0);
+ if (pVnode->sync <= 0) {
+ vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
+ return -1;
+ }
- // for test
setPingTimerMS(pVnode->sync, 3000);
setElectTimerMS(pVnode->sync, 500);
setHeartbeatTimerMS(pVnode->sync, 100);
-
return 0;
}
-int32_t vnodeSyncStart(SVnode *pVnode) {
+void vnodeSyncStart(SVnode *pVnode) {
+ syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync);
- return 0;
}
-void vnodeSyncClose(SVnode *pVnode) {
- // stop by ref id
- syncStop(pVnode->sync);
-}
-
-void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); }
+void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
-int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
- pMsg->info.noResp = 1;
- return tmsgSendReq(pEpSet, pMsg);
-}
-
-int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
- SVnode *pVnode = (SVnode *)(pFsm->data);
- vnodeGetSnapshot(pVnode, pSnapshot);
-
- /*
- pSnapshot->data = NULL;
- pSnapshot->lastApplyIndex = 0;
- pSnapshot->lastApplyTerm = 0;
- */
+int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
+int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
+ vnodeGetSnapshot(pFsm->data, pSnapshot);
return 0;
}
-void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
+void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) {
- SSnapshot snapshot;
+ SSnapshot snapshot = {0};
pFsm->FpGetSnapshot(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex;
}
@@ -128,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
}
}
-void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
+void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
@@ -136,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}
-void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
+void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}
-SSyncFSM *syncVnodeMakeFsm(SVnode *pVnode) {
- SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
+SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
+ SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode;
- pFsm->FpCommitCb = vnodeSyncCommitCb;
- pFsm->FpPreCommitCb = vnodeSyncPreCommitCb;
- pFsm->FpRollBackCb = vnodeSyncRollBackCb;
- pFsm->FpGetSnapshot = vnodeSyncGetSnapshotCb;
+ pFsm->FpCommitCb = vnodeSyncCommitMsg;
+ pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
+ pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
+ pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
return pFsm;
-}
+}
\ No newline at end of file
diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h
index 36f22db05f..768e1c1cf1 100644
--- a/source/libs/sync/inc/syncInt.h
+++ b/source/libs/sync/inc/syncInt.h
@@ -20,135 +20,41 @@
extern "C" {
#endif
-#include
-#include
-#include
-#include "cJSON.h"
#include "sync.h"
#include "syncTools.h"
-#include "taosdef.h"
-#include "tglobal.h"
#include "tlog.h"
#include "ttimer.h"
-#define sFatal(...) \
- { \
- if (sDebugFlag & DEBUG_FATAL) { \
- taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
- } \
- }
-#define sError(...) \
- { \
- if (sDebugFlag & DEBUG_ERROR) { \
- taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
- } \
- }
-#define sWarn(...) \
- { \
- if (sDebugFlag & DEBUG_WARN) { \
- taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
- } \
- }
-#define sInfo(...) \
- { \
- if (sDebugFlag & DEBUG_INFO) { \
- taosPrintLog("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \
- } \
- }
-#define sDebug(...) \
- { \
- if (sDebugFlag & DEBUG_DEBUG) { \
- taosPrintLog("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
- } \
- }
-#define sTrace(...) \
- { \
- if (sDebugFlag & DEBUG_TRACE) { \
- taosPrintLog("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
- } \
- }
+// clang-format off
+#define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
+#define sError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
+#define sWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
+#define sInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
+#define sDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
+#define sTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
+#define sFatalLong(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
+#define sErrorLong(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
+#define sWarnLong(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
+#define sInfoLong(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
+#define sDebugLong(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
+#define sTraceLong(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
+// clang-format on
-#define sFatalLong(...) \
- { \
- if (sDebugFlag & DEBUG_FATAL) { \
- taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
- } \
- }
-#define sErrorLong(...) \
- { \
- if (sDebugFlag & DEBUG_ERROR) { \
- taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
- } \
- }
-#define sWarnLong(...) \
- { \
- if (sDebugFlag & DEBUG_WARN) { \
- taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
- } \
- }
-#define sInfoLong(...) \
- { \
- if (sDebugFlag & DEBUG_INFO) { \
- taosPrintLongString("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \
- } \
- }
-#define sDebugLong(...) \
- { \
- if (sDebugFlag & DEBUG_DEBUG) { \
- taosPrintLongString("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
- } \
- }
-#define sTraceLong(...) \
- { \
- if (sDebugFlag & DEBUG_TRACE) { \
- taosPrintLongString("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
- } \
- }
-
-struct SyncTimeout;
-typedef struct SyncTimeout SyncTimeout;
-
-struct SyncClientRequest;
-typedef struct SyncClientRequest SyncClientRequest;
-
-struct SyncPing;
-typedef struct SyncPing SyncPing;
-
-struct SyncPingReply;
-typedef struct SyncPingReply SyncPingReply;
-
-struct SyncRequestVote;
-typedef struct SyncRequestVote SyncRequestVote;
-
-struct SyncRequestVoteReply;
-typedef struct SyncRequestVoteReply SyncRequestVoteReply;
-
-struct SyncAppendEntries;
-typedef struct SyncAppendEntries SyncAppendEntries;
-
-struct SyncAppendEntriesReply;
+typedef struct SyncTimeout SyncTimeout;
+typedef struct SyncClientRequest SyncClientRequest;
+typedef struct SyncPing SyncPing;
+typedef struct SyncPingReply SyncPingReply;
+typedef struct SyncRequestVote SyncRequestVote;
+typedef struct SyncRequestVoteReply SyncRequestVoteReply;
+typedef struct SyncAppendEntries SyncAppendEntries;
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
-
-struct SSyncEnv;
-typedef struct SSyncEnv SSyncEnv;
-
-struct SRaftStore;
-typedef struct SRaftStore SRaftStore;
-
-struct SVotesGranted;
-typedef struct SVotesGranted SVotesGranted;
-
-struct SVotesRespond;
-typedef struct SVotesRespond SVotesRespond;
-
-struct SSyncIndexMgr;
-typedef struct SSyncIndexMgr SSyncIndexMgr;
-
-struct SRaftCfg;
-typedef struct SRaftCfg SRaftCfg;
-
-struct SSyncRespMgr;
-typedef struct SSyncRespMgr SSyncRespMgr;
+typedef struct SSyncEnv SSyncEnv;
+typedef struct SRaftStore SRaftStore;
+typedef struct SVotesGranted SVotesGranted;
+typedef struct SVotesRespond SVotesRespond;
+typedef struct SSyncIndexMgr SSyncIndexMgr;
+typedef struct SRaftCfg SRaftCfg;
+typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncNode {
// init by SSyncInfo
diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c
index 56389de88a..d9ff60bbe2 100644
--- a/source/libs/sync/src/syncMain.c
+++ b/source/libs/sync/src/syncMain.c
@@ -674,10 +674,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
if (pSyncNode->FpSendMsg != NULL) {
- pMsg->info.noResp = 1;
// htonl
syncUtilMsgHtoN(pMsg->pCont);
+ pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg);
} else {
sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
@@ -689,10 +689,10 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->FpSendMsg != NULL) {
- pMsg->info.noResp = 1;
// htonl
syncUtilMsgHtoN(pMsg->pCont);
+ pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg);
} else {
sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp
index b8a9bec108..630d96054b 100644
--- a/source/libs/sync/test/syncIOSendMsgTest.cpp
+++ b/source/libs/sync/test/syncIOSendMsgTest.cpp
@@ -97,11 +97,12 @@ int main(int argc, char** argv) {
for (int i = 0; i < 10; ++i) {
SyncPingReply* pSyncMsg =
syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest");
- SRpcMsg rpcMsg;
+ SRpcMsg rpcMsg = {0};
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
SEpSet epSet;
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
+ rpcMsg.info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, &rpcMsg);
taosMsleep(1000);