diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index db9979bf12..dbe8a347da 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -193,9 +193,13 @@ typedef struct SSyncInfo { SWal* pWal; SSyncFSM* pFsm; SMsgCb* msgcb; - int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); - int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); - int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t pingMs; + int32_t electMs; + int32_t heartbeatMs; + + int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); } SSyncInfo; int32_t syncInit(); @@ -228,6 +232,8 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); +const char* syncUtilState2String(ESyncState state); + #ifdef __cplusplus } #endif diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index eb3c99fee7..9586d1febb 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -28,20 +28,11 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; -void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); char* sync2SimpleStr(int64_t rid); -// set timer ms -void setPingTimerMS(int64_t rid, int32_t pingTimerMS); -void setElectTimerMS(int64_t rid, int32_t electTimerMS); -void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS); - // for compatibility, the same as syncPropose int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak); -// utils -const char* syncUtilState2String(ESyncState state); - // ------------------ for debug ------------------- void syncRpcMsgPrint(SRpcMsg* pMsg); void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index dc14a28d6f..a0f3c98f83 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -20,7 +20,6 @@ #include "sdb.h" #include "sync.h" -#include "syncTools.h" #include "tcache.h" #include "tdatablock.h" #include "tglobal.h" diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 1cd97b73e6..cbd7d16c40 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -202,9 +202,12 @@ int32_t mndInitSync(SMnode *pMnode) { .vgId = 1, .pWal = pMnode->pWal, .msgcb = NULL, - .FpSendMsg = mndSyncSendMsg, - .FpEqMsg = mndSyncEqMsg, - .FpEqCtrlMsg = NULL, + .syncSendMSg = mndSyncSendMsg, + .syncEqMsg = mndSyncEqMsg, + .syncEqCtrlMsg = NULL, + .pingMs = 5000, + .electMs = 3000, + .heartbeatMs = 500, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); @@ -228,11 +231,6 @@ int32_t mndInitSync(SMnode *pMnode) { return -1; } - // decrease election timer - setPingTimerMS(pMgmt->sync, 5000); - setElectTimerMS(pMgmt->sync, 3000); - setHeartbeatTimerMS(pMgmt->sync, 500); - mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); return 0; } @@ -303,7 +301,6 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { void mndSyncStart(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); syncStart(pMgmt->sync); mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync); } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 988ecc5dd3..d5ad500fdb 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -17,7 +17,6 @@ #define _TD_VND_H_ #include "sync.h" -#include "syncTools.h" #include "ttrace.h" #include "vnodeInt.h" diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 1d88be42d8..a93086de1f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -503,9 +503,12 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { .syncCfg = pVnode->config.syncCfg, .pWal = pVnode->pWal, .msgcb = NULL, - .FpSendMsg = vnodeSyncSendMsg, - .FpEqMsg = vnodeSyncEqMsg, - .FpEqCtrlMsg = vnodeSyncEqCtrlMsg, + .syncSendMSg = vnodeSyncSendMsg, + .syncEqMsg = vnodeSyncEqMsg, + .syncEqCtrlMsg = vnodeSyncEqCtrlMsg, + .pingMs = 5000, + .electMs = 4000, + .heartbeatMs = 700, }; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); @@ -524,15 +527,11 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { return -1; } - setPingTimerMS(pVnode->sync, 5000); - setElectTimerMS(pVnode->sync, 4000); - setHeartbeatTimerMS(pVnode->sync, 700); return 0; } void vnodeSyncStart(SVnode *pVnode) { vDebug("vgId:%d, start sync", pVnode->config.vgId); - syncSetMsgCb(pVnode->sync, &pVnode->msgCb); syncStart(pVnode->sync); } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 5e8041e54a..63aa6d81c9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -107,9 +107,9 @@ typedef struct SSyncNode { // sync io SWal* pWal; const SMsgCb* msgcb; - int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); - int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); - int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); + int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); // init internal SNodeInfo myNodeInfo; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b3edc2ca73..4ccf14e6d4 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -56,6 +56,12 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) { return -1; } + pSyncNode->pingBaseLine = pSyncInfo->pingMs; + pSyncNode->pingTimerMS = pSyncInfo->pingMs; + pSyncNode->electBaseLine = pSyncInfo->electMs; + pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs; + pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs; + pSyncNode->msgcb = pSyncInfo->msgcb; return pSyncNode->rid; } @@ -747,18 +753,6 @@ static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandl sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); } -void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid); - return; - } - ASSERT(rid == pSyncNode->rid); - pSyncNode->msgcb = msgcb; - - syncNodeRelease(pSyncNode); -} - char* sync2SimpleStr(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { @@ -772,41 +766,6 @@ char* sync2SimpleStr(int64_t rid) { return s; } -void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return; - } - ASSERT(rid == pSyncNode->rid); - pSyncNode->pingBaseLine = pingTimerMS; - pSyncNode->pingTimerMS = pingTimerMS; - - syncNodeRelease(pSyncNode); -} - -void setElectTimerMS(int64_t rid, int32_t electTimerMS) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return; - } - ASSERT(rid == pSyncNode->rid); - pSyncNode->electBaseLine = electTimerMS; - - syncNodeRelease(pSyncNode); -} - -void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return; - } - ASSERT(rid == pSyncNode->rid); - pSyncNode->hbBaseLine = hbTimerMS; - pSyncNode->heartbeatTimerMS = hbTimerMS; - - syncNodeRelease(pSyncNode); -} - int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { @@ -903,7 +862,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } } else { - if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { + if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { ret = 0; } else { ret = -1; @@ -1034,9 +993,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->msgcb = pSyncInfo->msgcb; - pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; - pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; - pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg; + pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg; + pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg; + pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg; // init raft config pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); @@ -1552,12 +1511,12 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); - if (pSyncNode->FpSendMsg != NULL) { + if (pSyncNode->syncSendMSg != NULL) { // htonl syncUtilMsgHtoN(pMsg->pCont); pMsg->info.noResp = 1; - pSyncNode->FpSendMsg(&epSet, pMsg); + pSyncNode->syncSendMSg(&epSet, pMsg); } else { sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId); return -1; @@ -1569,12 +1528,12 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilnodeInfo2EpSet(nodeInfo, &epSet); - if (pSyncNode->FpSendMsg != NULL) { + if (pSyncNode->syncSendMSg != NULL) { // htonl syncUtilMsgHtoN(pMsg->pCont); pMsg->info.noResp = 1; - pSyncNode->FpSendMsg(&epSet, pMsg); + pSyncNode->syncSendMSg(&epSet, pMsg); } else { sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId); } @@ -1598,13 +1557,13 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); - cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncSendMSg); + cJSON_AddStringToObject(pRoot, "syncSendMSg", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); cJSON_AddStringToObject(pRoot, "queue", u64buf); - snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); - cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncEqMsg); + cJSON_AddStringToObject(pRoot, "syncEqMsg", u64buf); // init internal cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo); @@ -2617,8 +2576,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); - if (pSyncNode->FpEqMsg != NULL) { - int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (pSyncNode->syncEqMsg != NULL) { + int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); @@ -2626,7 +2585,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { return; } } else { - sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); + sTrace("syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); @@ -2651,8 +2610,8 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { pSyncNode->vgId, pSyncNode); SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); - if (pSyncNode->FpEqMsg != NULL) { - int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (pSyncNode->syncEqMsg != NULL) { + int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); @@ -2668,7 +2627,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } while (0); } else { - sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); + sTrace("syncNodeEqElectTimer syncEqMsg is NULL"); } syncTimeoutDestroy(pSyncMsg); @@ -2700,8 +2659,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { SRpcMsg rpcMsg; syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg); - if (pSyncNode->FpEqMsg != NULL) { - int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + if (pSyncNode->syncEqMsg != NULL) { + int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); @@ -2709,7 +2668,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { return; } } else { - sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); + sError("vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId); } syncTimeoutDestroy(pSyncMsg); @@ -2756,8 +2715,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { // eq msg #if 0 - if (pSyncNode->FpEqCtrlMsg != NULL) { - int32_t code = pSyncNode->FpEqCtrlMsg(pSyncNode->msgcb, &rpcMsg); + if (pSyncNode->syncEqCtrlMsg != NULL) { + int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); @@ -2765,7 +2724,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { return; } } else { - sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); + sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId); } #endif @@ -2805,10 +2764,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { SRpcMsg rpcMsg = {0}; syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); - if (ths->FpEqMsg != NULL) { - ths->FpEqMsg(ths->msgcb, &rpcMsg); + if (ths->syncEqMsg != NULL) { + ths->syncEqMsg(ths->msgcb, &rpcMsg); } else { - sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); + sTrace("syncNodeEqNoop pSyncNode->syncEqMsg is NULL"); } syncEntryDestory(pEntry); @@ -2919,8 +2878,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { SRpcMsg rpcMsgLocalCmd; syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); - if (ths->FpEqMsg != NULL && ths->msgcb != NULL) { - int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd); + if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { + int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); if (code != 0) { sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); rpcFreeCont(rpcMsgLocalCmd.pCont); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 88af5746d4..35c831b52f 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -145,16 +145,17 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { taosArrayPush(delIndexArray, pSeqNum); cnt++; - SFsmCbMeta cbMeta = {0}; - cbMeta.index = SYNC_INDEX_INVALID; - cbMeta.lastConfigIndex = SYNC_INDEX_INVALID; - cbMeta.isWeak = false; - cbMeta.code = TSDB_CODE_SYN_TIMEOUT; - cbMeta.state = pSyncNode->state; - cbMeta.seqNum = *pSeqNum; - cbMeta.term = SYNC_TERM_INVALID; - cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; - cbMeta.flag = 0; + SFsmCbMeta cbMeta = { + cbMeta.index = SYNC_INDEX_INVALID, + cbMeta.lastConfigIndex = SYNC_INDEX_INVALID, + cbMeta.isWeak = false, + cbMeta.code = TSDB_CODE_SYN_TIMEOUT, + cbMeta.state = pSyncNode->state, + cbMeta.seqNum = *pSeqNum, + cbMeta.term = SYNC_TERM_INVALID, + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm, + cbMeta.flag = 0, + }; pStub->rpcMsg.pCont = NULL; pStub->rpcMsg.contLen = 0; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index f152201901..1750dce5ec 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -177,18 +177,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) { } const char* syncUtilState2String(ESyncState state) { - /* - if (state == TAOS_SYNC_STATE_FOLLOWER) { - return "TAOS_SYNC_STATE_FOLLOWER"; - } else if (state == TAOS_SYNC_STATE_CANDIDATE) { - return "TAOS_SYNC_STATE_CANDIDATE"; - } else if (state == TAOS_SYNC_STATE_LEADER) { - return "TAOS_SYNC_STATE_LEADER"; - } else { - return "TAOS_SYNC_STATE_UNKNOWN"; - } - */ - if (state == TAOS_SYNC_STATE_FOLLOWER) { return "follower"; } else if (state == TAOS_SYNC_STATE_CANDIDATE) { diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 8f16be27e7..96e0b6c483 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -195,8 +195,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index d1244546c9..bf01f76607 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -120,8 +120,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncElectTest.cpp b/source/libs/sync/test/syncElectTest.cpp index d09879a699..c290368c7f 100644 --- a/source/libs/sync/test/syncElectTest.cpp +++ b/source/libs/sync/test/syncElectTest.cpp @@ -45,8 +45,8 @@ SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWa SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = NULL; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index fdb5cf7ac8..35dc7e4398 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -32,8 +32,8 @@ SSyncNode *pSyncNode; SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index 191e245b1e..d43789c91e 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -25,8 +25,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); @@ -97,7 +97,7 @@ int main(int argc, char** argv) { SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest"); SRpcMsg rpcMsg; syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); - pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); + pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp index 4a457136c2..055f869130 100644 --- a/source/libs/sync/test/syncIOSendMsgTest.cpp +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); @@ -103,7 +103,7 @@ int main(int argc, char** argv) { SEpSet epSet; syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); rpcMsg.info.noResp = 1; - pSyncNode->FpSendMsg(&epSet, &rpcMsg); + pSyncNode->syncSendMSg(&epSet, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index d654ad06fe..4333127405 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test"); diff --git a/source/libs/sync/test/syncPingSelfTest.cpp b/source/libs/sync/test/syncPingSelfTest.cpp index f44cbb04d5..7d8ed73ac7 100644 --- a/source/libs/sync/test/syncPingSelfTest.cpp +++ b/source/libs/sync/test/syncPingSelfTest.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp index fd6342aa84..c074103f38 100644 --- a/source/libs/sync/test/syncPingTimerTest.cpp +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncPingTimerTest2.cpp b/source/libs/sync/test/syncPingTimerTest2.cpp index 295003dff3..2683f48487 100644 --- a/source/libs/sync/test/syncPingTimerTest2.cpp +++ b/source/libs/sync/test/syncPingTimerTest2.cpp @@ -26,8 +26,8 @@ SSyncFSM* pFsm; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index adb3deb22d..7552fc7ae3 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -100,8 +100,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 50771ac476..2aff0aad93 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -87,8 +87,8 @@ void initFsm() { SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); @@ -204,7 +204,7 @@ int main(int argc, char **argv) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); - gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); + gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index bdb4d7d2d8..a36f3af450 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -217,8 +217,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* SSyncInfo syncInfo; syncInfo.vgId = vgId; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index e2e8748697..bbf14c604e 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -28,8 +28,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 881a5331b1..adebfe1be2 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -28,8 +28,8 @@ SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index fee98ddd52..0547f39bee 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -65,8 +65,8 @@ void initFsm() { SSyncNode *syncNodeInit() { syncInfo.vgId = 1234; syncInfo.msgcb = &gSyncIO->msgcb; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.syncSendMSg = syncIOSendMsg; + syncInfo.syncEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); @@ -179,7 +179,7 @@ int main(int argc, char **argv) { SyncClientRequest *pSyncClientRequest = pMsg1; SRpcMsg rpcMsg; syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); - gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); + gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg); taosMsleep(1000); }