diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 5b387852fa..c6f4a4f5c0 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -137,7 +137,9 @@ typedef struct SSyncInfo { SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; SSyncFSM* pFsm; - int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); + + void* rpcClient; + int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); } SSyncInfo; diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index c4735d4481..7e60822a28 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -34,17 +34,18 @@ typedef struct SSyncEnv { tmr_h pEnvTickTimer; tmr_h pTimerManager; char name[128]; + } SSyncEnv; -extern SSyncEnv *gSyncEnv; +extern SSyncEnv* gSyncEnv; int32_t syncEnvStart(); int32_t syncEnvStop(); -tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void *param); +tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param); -void syncEnvStopTimer(tmr_h *pTimer); +void syncEnvStopTimer(tmr_h* pTimer); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index ec887aa855..6723fa66c5 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -30,10 +30,10 @@ extern "C" { #include "trpc.h" typedef struct SSyncIO { - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; STaosQueue *pMsgQ; - STaosQset *pQset; + STaosQset * pQset; pthread_t tid; int8_t isStart; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e02e2d7374..b2922a8676 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -146,7 +146,8 @@ typedef struct SSyncNode { int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); // passed from outside - int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); + void* rpcClient; + int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); } SSyncNode; diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h new file mode 100644 index 0000000000..f3d4c2ed07 --- /dev/null +++ b/source/libs/sync/inc/syncUtil.h @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_UTIL_H +#define _TD_LIBS_SYNC_UTIL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "syncMessage.h" +#include "taosdef.h" + +void nodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); + +void raftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); + +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_UTIL_H*/ diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7cfc470f60..9b7d068ada 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -18,10 +18,14 @@ #include "syncEnv.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncUtil.h" static int32_t tsNodeRefId = -1; // ------ local funciton --------- +static int32_t doSyncNodeSendMsgById(SRaftId* destRaftId, struct SSyncNode* pSyncNode, SRpcMsg* pMsg); +static int32_t doSyncNodeSendMsgByInfo(SNodeInfo* nodeInfo, struct SSyncNode* pSyncNode, SRpcMsg* pMsg); + static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg); static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg); static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg); @@ -68,7 +72,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->FpPingTimer = syncNodePingTimerCb; pSyncNode->pingTimerCounter = 0; + pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + pSyncNode->FpOnPing = onSyncNodePing; pSyncNode->FpOnPingReply = onSyncNodePingReply; pSyncNode->FpOnRequestVote = onSyncNodeRequestVote; @@ -84,7 +90,11 @@ void syncNodeClose(SSyncNode* pSyncNode) { free(pSyncNode); } -void syncNodePingAll(SSyncNode* pSyncNode) { sTrace("syncNodePingAll %p ", pSyncNode); } +void syncNodePingAll(SSyncNode* pSyncNode) { + sTrace("syncNodePingAll %p ", pSyncNode); + SyncPing msg; + doSyncNodePing(pSyncNode, &msg); +} void syncNodePingPeers(SSyncNode* pSyncNode) {} @@ -110,8 +120,30 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { } // ------ local funciton --------- + +static int32_t doSyncNodeSendMsgById(SRaftId* destRaftId, struct SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + raftId2EpSet(destRaftId, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +static int32_t doSyncNodeSendMsgByInfo(SNodeInfo* nodeInfo, struct SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + nodeInfo2EpSet(nodeInfo, &epSet); + + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) { - int32_t ret = 0; + int32_t ret; + for (int i = 0; i < ths->syncCfg.replicaNum; ++i) { + SRpcMsg* rpcMsg; + syncPing2RpcMsg(pMsg, rpcMsg); + doSyncNodeSendMsgByInfo(&ths->syncCfg.nodeInfo[i], ths, rpcMsg); + } + return ret; } @@ -167,6 +199,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); - syncNodePingSelf(pSyncNode); + syncNodePingAll(pSyncNode); } } \ No newline at end of file diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c new file mode 100644 index 0000000000..d68b1def4d --- /dev/null +++ b/source/libs/sync/src/syncUtil.c @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncUtil.h" + +void nodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) {} + +void raftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {} + +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {} \ No newline at end of file diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index bf0d342ca4..43196bdd1f 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -18,6 +18,7 @@ SSyncNode* doSync() { SSyncInfo syncInfo; syncInfo.vgId = 1; + syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");