From fca35ceb29ee8556474812abf7ce25c4e5c16a19 Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 2 Nov 2021 10:49:23 +0800 Subject: [PATCH] [TD-10645][raft]add sync rpc client and server --- source/libs/sync/CMakeLists.txt | 1 + source/libs/sync/inc/syncInt.h | 11 +++- source/libs/sync/src/sync.c | 106 +++++++++++++++++++++++++++++++- 3 files changed, 113 insertions(+), 5 deletions(-) diff --git a/source/libs/sync/CMakeLists.txt b/source/libs/sync/CMakeLists.txt index 124f4a1fee..37ee5194c8 100644 --- a/source/libs/sync/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -4,6 +4,7 @@ add_library(sync ${SYNC_SRC}) target_link_libraries( sync PUBLIC common + PUBLIC transport PUBLIC util PUBLIC wal ) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 81cb686781..73015e87a1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -39,12 +39,17 @@ struct SSyncNode { typedef struct SSyncManager { pthread_mutex_t mutex; + // sync server rpc + void* serverRpc; + // rpc server hash table base on FQDN:port key + SHashObj* rpcServerTable; + + // sync client rpc + void* clientRpc; + // worker threads SSyncWorker worker[TAOS_SYNC_MAX_WORKER]; - // sync net worker - SSyncWorker netWorker; - // vgroup hash table SHashObj* vgroupTable; diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index a9df02f818..e3d0606c08 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -14,12 +14,20 @@ */ #include "syncInt.h" +#include "trpc.h" #include "ttimer.h" SSyncManager* gSyncManager = NULL; #define SYNC_TICK_TIMER 50 +#define SYNC_ACTIVITY_TIMER 5 +#define SYNC_SERVER_WORKER 2 +static void syncProcessRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet); +static void syncProcessReqMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); + +static int syncInitRpcServer(SSyncManager* syncManager, const SSyncCluster* pSyncCfg); +static int syncInitRpcClient(SSyncManager* syncManager); static int syncOpenWorkerPool(SSyncManager* syncManager); static int syncCloseWorkerPool(SSyncManager* syncManager); static void *syncWorkerMain(void *argv); @@ -30,7 +38,7 @@ int32_t syncInit() { return 0; } - gSyncManager = (SSyncManager*)malloc(sizeof(SSyncManager)); + gSyncManager = (SSyncManager*)calloc(sizeof(SSyncManager), 0); if (gSyncManager == NULL) { syncError("malloc SSyncManager fail"); return -1; @@ -38,6 +46,12 @@ int32_t syncInit() { pthread_mutex_init(&gSyncManager->mutex, NULL); + // init client rpc + if (syncInitRpcClient(gSyncManager) != 0) { + syncCleanUp(); + return -1; + } + // init sync timer manager gSyncManager->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); if (gSyncManager->syncTimerManager == NULL) { @@ -68,7 +82,13 @@ void syncCleanUp() { if (gSyncManager->vgroupTable) { taosHashCleanup(gSyncManager->vgroupTable); } - taosTmrCleanUp(gSyncManager->syncTimerManager); + if (gSyncManager->clientRpc) { + rpcClose(gSyncManager->clientRpc); + syncInfo("sync inter-sync rpc client is closed"); + } + if (gSyncManager->syncTimerManager) { + taosTmrCleanUp(gSyncManager->syncTimerManager); + } syncCloseWorkerPool(gSyncManager); pthread_mutex_unlock(&gSyncManager->mutex); pthread_mutex_destroy(&gSyncManager->mutex); @@ -86,6 +106,12 @@ SSyncNode* syncStart(const SSyncInfo* pInfo) { return *ppNode; } + // init rpc server + if (syncInitRpcServer(gSyncManager, &pInfo->syncCfg) != 0) { + pthread_mutex_unlock(&gSyncManager->mutex); + return NULL; + } + SSyncNode *pNode = (SSyncNode*)malloc(sizeof(SSyncNode)); if (pNode == NULL) { syncError("malloc vgroup %d node fail", pInfo->vgId); @@ -141,6 +167,82 @@ int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, b void syncReconfig(const SSyncNode* pNode, const SSyncCluster* pCfg) {} +// process rpc rsp message from other sync server +static void syncProcessRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + +} + +// process rpc message from other sync server +static void syncProcessReqMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { + +} + +static int syncInitRpcServer(SSyncManager* syncManager, const SSyncCluster* pSyncCfg) { + if (gSyncManager->rpcServerTable == NULL) { + gSyncManager->rpcServerTable = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (gSyncManager->rpcServerTable == NULL) { + syncError("init sync rpc server hash table error"); + return -1; + } + } + assert(pSyncCfg->selfIndex < pSyncCfg->replica && pSyncCfg->selfIndex >= 0); + const SNodeInfo* pNode = &(pSyncCfg->nodeInfo[pSyncCfg->replica]); + char buffer[20] = {'\0'}; + snprintf(buffer, sizeof(buffer), "%s:%d", &(pNode->nodeFqdn[0]), pNode->nodePort); + size_t len = strlen(buffer); + void** ppRpcServer = taosHashGet(gSyncManager->rpcServerTable, buffer, len); + if (ppRpcServer != NULL) { + // already inited + syncInfo("sync rpc server for %s already exist", buffer); + return 0; + } + + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = pNode->nodePort; + rpcInit.label = "sync-server"; + rpcInit.numOfThreads = SYNC_SERVER_WORKER; + rpcInit.cfp = syncProcessReqMsg; + rpcInit.sessions = TSDB_MAX_VNODES << 4; + rpcInit.connType = TAOS_CONN_SERVER; + rpcInit.idleTime = SYNC_ACTIVITY_TIMER * 1000; + + void* rpcServer = rpcOpen(&rpcInit); + if (rpcServer == NULL) { + syncInfo("rpcOpen for sync rpc server for %s fail", buffer); + return -1; + } + + taosHashPut(gSyncManager->rpcServerTable, buffer, strlen(buffer), rpcServer, len); + syncInfo("sync rpc server for %s init success", buffer); + + return 0; +} + +static int syncInitRpcClient(SSyncManager* syncManager) { + char secret[TSDB_KEY_LEN] = "secret"; + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.label = "sync-client"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = syncProcessRsp; + rpcInit.sessions = TSDB_MAX_VNODES << 4; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.idleTime = SYNC_ACTIVITY_TIMER * 1000; + rpcInit.user = "t"; + rpcInit.ckey = "key"; + rpcInit.secret = secret; + + syncManager->clientRpc = rpcOpen(&rpcInit); + if (syncManager->clientRpc == NULL) { + syncError("failed to init sync rpc client"); + return -1; + } + + syncInfo("sync inter-sync rpc client is initialized"); + return 0; +} + static int syncOpenWorkerPool(SSyncManager* syncManager) { int i; pthread_attr_t thattr;