From 84effaaba904024da6545d197cf94de0911d68d8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 2 Nov 2021 15:24:07 +0800 Subject: [PATCH] rename SRpcEpset to SEpSet --- include/common/taosmsg.h | 11 +--------- include/libs/transport/trpc.h | 18 ++++++---------- include/server/mnode/mnode.h | 2 +- source/dnode/mgmt/inc/dnodeDnode.h | 6 +++--- source/dnode/mgmt/inc/dnodeInt.h | 2 +- source/dnode/mgmt/inc/dnodeMnode.h | 5 ++--- source/dnode/mgmt/inc/dnodeTransport.h | 2 +- source/dnode/mgmt/inc/dnodeVnodes.h | 2 +- source/dnode/mgmt/src/dnodeDnode.c | 14 ++++++------ source/dnode/mgmt/src/dnodeMnode.c | 27 +++++++++++++++++++---- source/dnode/mgmt/src/dnodeTransport.c | 15 ++++++------- source/dnode/mgmt/src/dnodeVnodes.c | 2 +- source/dnode/mnode/inc/mnodeInt.h | 2 +- source/dnode/mnode/inc/mnodeMnode.h | 4 ++-- source/dnode/mnode/src/mnodeMnode.c | 4 ++-- source/dnode/mnode/src/mnodeWorker.c | 12 +++++------ source/dnode/mnode/src/mondeInt.c | 2 +- source/libs/transport/src/rpcMain.c | 30 +++++++++++++------------- 18 files changed, 81 insertions(+), 79 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index f9ad39fe78..9dc1551970 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -901,16 +901,7 @@ typedef struct { typedef struct { char queryId[TSDB_KILL_MSG_LEN + 1]; -} SKillQueryMsg, SKillStreamMsg, SKillConnMsg; - -typedef struct { - int32_t vnode; - int32_t sid; - uint64_t uid; - uint64_t stime; // stream starting time - int32_t status; - char tableFname[TSDB_TABLE_FNAME_LEN]; -} SAlterStreamMsg; +} SKillQueryMsg, SKillConnMsg; typedef struct { char user[TSDB_USER_LEN]; diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 0ce2e3da14..5460ae5401 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -22,22 +22,16 @@ extern "C" { #include #include #include "taosdef.h" +#include "taosmsg.h" #define TAOS_CONN_SERVER 0 #define TAOS_CONN_CLIENT 1 extern int tsRpcHeadSize; -typedef struct SRpcEpSet { - int8_t inUse; - int8_t numOfEps; - uint16_t port[TSDB_MAX_REPLICA]; - char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN]; -} SRpcEpSet; - typedef struct SRpcCorEpSet { int32_t version; - SRpcEpSet epSet; + SEpSet epSet; } SRpcCorEpSet; typedef struct SRpcConnInfo { @@ -72,7 +66,7 @@ typedef struct SRpcInit { char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(SRpcMsg *, SRpcEpSet *); + void (*cfp)(SRpcMsg *, SEpSet *); // call back to retrieve the client auth info, for server app only int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); @@ -85,11 +79,11 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); -void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); +void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); -void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); diff --git a/include/server/mnode/mnode.h b/include/server/mnode/mnode.h index 9bbc2e4b10..e41d157057 100644 --- a/include/server/mnode/mnode.h +++ b/include/server/mnode/mnode.h @@ -29,7 +29,7 @@ typedef struct { * @param epSet, the endpoint list of the dnodes. * @param rpcMsg, message to be sent. */ - void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); + void (*SendMsgToDnode)(struct SEpSet *epSet, struct SRpcMsg *rpcMsg); /** * Send messages to mnode, such as config message. diff --git a/source/dnode/mgmt/inc/dnodeDnode.h b/source/dnode/mgmt/inc/dnodeDnode.h index 9a2b564bd4..2ca1368e63 100644 --- a/source/dnode/mgmt/inc/dnodeDnode.h +++ b/source/dnode/mgmt/inc/dnodeDnode.h @@ -23,13 +23,13 @@ extern "C" { int32_t dnodeInitDnode(); void dnodeCleanupDnode(); -void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); +void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dnodeGetDnodeId(); int64_t dnodeGetClusterId(); void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); -void dnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet); -void dnodeGetMnodeEpSetForShell(SRpcEpSet *epSet); +void dnodeGetMnodeEpSetForPeer(SEpSet *epSet); +void dnodeGetMnodeEpSetForShell(SEpSet *epSet); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); #ifdef __cplusplus diff --git a/source/dnode/mgmt/inc/dnodeInt.h b/source/dnode/mgmt/inc/dnodeInt.h index e3306ecfa4..472c467ad6 100644 --- a/source/dnode/mgmt/inc/dnodeInt.h +++ b/source/dnode/mgmt/inc/dnodeInt.h @@ -35,7 +35,7 @@ extern int32_t dDebugFlag; #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat; -typedef void (*MsgFp)(SRpcMsg *pMsg, SRpcEpSet *pEpSet); +typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dnodeInit(); void dnodeCleanup(); diff --git a/source/dnode/mgmt/inc/dnodeMnode.h b/source/dnode/mgmt/inc/dnodeMnode.h index e696acf425..28702dcb84 100644 --- a/source/dnode/mgmt/inc/dnodeMnode.h +++ b/source/dnode/mgmt/inc/dnodeMnode.h @@ -23,9 +23,8 @@ extern "C" { int32_t dnodeInitMnode(); void dnodeCleanupMnode(); -void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); - -void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg); +void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet); +int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dnodeTransport.h b/source/dnode/mgmt/inc/dnodeTransport.h index e8223f4c06..95ca1b81e5 100644 --- a/source/dnode/mgmt/inc/dnodeTransport.h +++ b/source/dnode/mgmt/inc/dnodeTransport.h @@ -24,7 +24,7 @@ extern "C" { int32_t dnodeInitTrans(); void dnodeCleanupTrans(); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); -void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); +void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dnodeVnodes.h b/source/dnode/mgmt/inc/dnodeVnodes.h index 46dcf549d4..6e9bce9ae5 100644 --- a/source/dnode/mgmt/inc/dnodeVnodes.h +++ b/source/dnode/mgmt/inc/dnodeVnodes.h @@ -23,7 +23,7 @@ extern "C" { int32_t dnodeInitVnodes(); void dnodeCleanupVnodes(); -void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); +void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet); void dnodeGetVnodes(SVnodeLoads *pVloads); #ifdef __cplusplus diff --git a/source/dnode/mgmt/src/dnodeDnode.c b/source/dnode/mgmt/src/dnodeDnode.c index 701a190841..37dc9f05a1 100644 --- a/source/dnode/mgmt/src/dnodeDnode.c +++ b/source/dnode/mgmt/src/dnodeDnode.c @@ -27,8 +27,8 @@ static struct { int64_t clusterId; SDnodeEps *dnodeEps; SHashObj *dnodeHash; - SRpcEpSet mnodeEpSetForShell; - SRpcEpSet mnodeEpSetForPeer; + SEpSet mnodeEpSetForShell; + SEpSet mnodeEpSetForPeer; char file[PATH_MAX + 20]; uint32_t rebootTime; int8_t dropped; @@ -67,13 +67,13 @@ void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) { pthread_mutex_unlock(&tsDnode.mutex); } -void dnodeGetMnodeEpSetForPeer(SRpcEpSet *pEpSet) { +void dnodeGetMnodeEpSetForPeer(SEpSet *pEpSet) { pthread_mutex_lock(&tsDnode.mutex); *pEpSet = tsDnode.mnodeEpSetForPeer; pthread_mutex_unlock(&tsDnode.mutex); } -void dnodeGetMnodeEpSetForShell(SRpcEpSet *pEpSet) { +void dnodeGetMnodeEpSetForShell(SEpSet *pEpSet) { pthread_mutex_lock(&tsDnode.mutex); *pEpSet = tsDnode.mnodeEpSetForShell; pthread_mutex_unlock(&tsDnode.mutex); @@ -82,7 +82,7 @@ void dnodeGetMnodeEpSetForShell(SRpcEpSet *pEpSet) { void dnodeSendRedirectMsg(SRpcMsg *pMsg, bool forShell) { int32_t msgType = pMsg->msgType; - SRpcEpSet epSet = {0}; + SEpSet epSet = {0}; if (forShell) { dnodeGetMnodeEpSetForShell(&epSet); } else { @@ -107,7 +107,7 @@ void dnodeSendRedirectMsg(SRpcMsg *pMsg, bool forShell) { rpcSendRedirectRsp(pMsg->handle, &epSet); } -static void dnodeUpdateMnodeEpSet(SRpcEpSet *pEpSet) { +static void dnodeUpdateMnodeEpSet(SEpSet *pEpSet) { if (pEpSet == NULL || pEpSet->numOfEps <= 0) { dError("mnode is changed, but content is invalid, discard it"); return; @@ -528,7 +528,7 @@ void dnodeCleanupDnode() { dInfo("dnode-dnode is cleaned up"); } -void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { +void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { int32_t msgType = pMsg->msgType; if (msgType == TSDB_MSG_TYPE_STATUS_RSP && pEpSet) { diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/src/dnodeMnode.c index 6346b3386f..54fa847790 100644 --- a/source/dnode/mgmt/src/dnodeMnode.c +++ b/source/dnode/mgmt/src/dnodeMnode.c @@ -61,9 +61,28 @@ void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); } -void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - mnodeProcessMsg(pMsg); - // tsDnode.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq; +void dnodeProcessDropMnodeReq(SRpcMsg *pMsg) { + int32_t code = dnodeStartMnode(pMsg); - // tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessDropMnodeReq; + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); +} + +void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { + switch (pMsg->msgType) { + case TSDB_MSG_TYPE_CREATE_MNODE_IN: + dnodeProcessCreateMnodeReq(pMsg); + break; + case TSDB_MSG_TYPE_DROP_MNODE_IN: + dnodeProcessDropMnodeReq(pMsg); + break; + default: + mnodeProcessMsg(pMsg); + } +} + +int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey) { + return mnodeRetriveAuth(user, spi, encrypt, secret, ckey); } \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c index 8170c8c21d..18e97e2b9f 100644 --- a/source/dnode/mgmt/src/dnodeTransport.c +++ b/source/dnode/mgmt/src/dnodeTransport.c @@ -24,7 +24,6 @@ #include "dnodeDnode.h" #include "dnodeMnode.h" #include "dnodeVnodes.h" -#include "mnode.h" static struct { void *peerRpc; @@ -119,7 +118,7 @@ static void dnodeInitMsgFp() { tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg; } -static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { +static void dnodeProcessPeerReq(SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg rspMsg = {.handle = pMsg->handle}; int32_t msgType = pMsg->msgType; @@ -183,7 +182,7 @@ static void dnodeCleanupPeerServer() { } } -static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { +static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SEpSet *pEpSet) { int32_t msgType = pMsg->msgType; if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { @@ -237,7 +236,7 @@ static void dnodeCleanupClient() { } } -static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { +static void dnodeProcessShellReq(SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg rspMsg = {.handle = pMsg->handle}; int32_t msgType = pMsg->msgType; @@ -274,13 +273,13 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { - SRpcEpSet epSet = {0}; + SEpSet epSet = {0}; dnodeGetMnodeEpSetForPeer(&epSet); rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp); } static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); + int32_t code = dnodeGetUserAuthFromMnode(user, spi, encrypt, secret, ckey); if (code != TSDB_CODE_APP_NOT_READY) return code; SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); @@ -362,10 +361,10 @@ void dnodeCleanupTrans() { dnodeCleanupClient(); } -void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } +void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { - SRpcEpSet epSet = {0}; + SEpSet epSet = {0}; dnodeGetMnodeEpSetForPeer(&epSet); dnodeSendMsgToDnode(&epSet, rpcMsg); } \ No newline at end of file diff --git a/source/dnode/mgmt/src/dnodeVnodes.c b/source/dnode/mgmt/src/dnodeVnodes.c index f9032b419e..59f5cf0fd0 100644 --- a/source/dnode/mgmt/src/dnodeVnodes.c +++ b/source/dnode/mgmt/src/dnodeVnodes.c @@ -21,6 +21,6 @@ int32_t dnodeInitVnodes() { return vnodeInit(); } void dnodeCleanupVnodes() { vnodeCleanup(); } -void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); } +void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); } void dnodeGetVnodes(SVnodeLoads *pVloads) {} \ No newline at end of file diff --git a/source/dnode/mnode/inc/mnodeInt.h b/source/dnode/mnode/inc/mnodeInt.h index 0ce47cbe36..5b552e5c51 100644 --- a/source/dnode/mnode/inc/mnodeInt.h +++ b/source/dnode/mnode/inc/mnodeInt.h @@ -27,7 +27,7 @@ int32_t mnodeGetDnodeId(); int64_t mnodeGetClusterId(); EMnStatus mnodeGetStatus(); -void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); +void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg); void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell); void mnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); diff --git a/source/dnode/mnode/inc/mnodeMnode.h b/source/dnode/mnode/inc/mnodeMnode.h index cee96c7bf6..0c8069a917 100644 --- a/source/dnode/mnode/inc/mnodeMnode.h +++ b/source/dnode/mnode/inc/mnodeMnode.h @@ -24,8 +24,8 @@ extern "C" { int32_t mnodeInitMnode(); void mnodeCleanupMnode(); -void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect); -void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect); +void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect); +void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect); #ifdef __cplusplus } diff --git a/source/dnode/mnode/src/mnodeMnode.c b/source/dnode/mnode/src/mnodeMnode.c index d2bcd25fc7..60e1627ad4 100644 --- a/source/dnode/mnode/src/mnodeMnode.c +++ b/source/dnode/mnode/src/mnodeMnode.c @@ -20,5 +20,5 @@ int32_t mnodeInitMnode() { return 0; } void mnodeCleanupMnode() {} -void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect) {} -void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) {} \ No newline at end of file +void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {} +void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {} \ No newline at end of file diff --git a/source/dnode/mnode/src/mnodeWorker.c b/source/dnode/mnode/src/mnodeWorker.c index 5ac44a3a06..e4b4c47ddf 100644 --- a/source/dnode/mnode/src/mnodeWorker.c +++ b/source/dnode/mnode/src/mnodeWorker.c @@ -297,10 +297,10 @@ static void mnodeProcessWriteReq(SMnMsg *pMsg, void *unused) { if (!mnodeIsMaster()) { SMnRsp *rpcRsp = &pMsg->rpcRsp; - SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); mnodeGetMnodeEpSetForShell(epSet, true); rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SRpcEpSet); + rpcRsp->len = sizeof(SEpSet); mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], epSet->numOfEps, epSet->inUse); @@ -334,14 +334,14 @@ static void mnodeProcessReadReq(SMnMsg *pMsg, void *unused) { if (!mnodeIsMaster()) { SMnRsp *rpcRsp = &pMsg->rpcRsp; - SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); if (!epSet) { code = TSDB_CODE_MND_OUT_OF_MEMORY; goto PROCESS_READ_REQ_END; } mnodeGetMnodeEpSetForShell(epSet, true); rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SRpcEpSet); + rpcRsp->len = sizeof(SEpSet); mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], epSet->numOfEps, epSet->inUse); @@ -375,10 +375,10 @@ static void mnodeProcessPeerReq(SMnMsg *pMsg, void *unused) { if (!mnodeIsMaster()) { SMnRsp *rpcRsp = &pMsg->rpcRsp; - SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); + SEpSet *epSet = rpcMallocCont(sizeof(SEpSet)); mnodeGetMnodeEpSetForPeer(epSet, true); rpcRsp->rsp = epSet; - rpcRsp->len = sizeof(SRpcEpSet); + rpcRsp->len = sizeof(SEpSet); mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], epSet->numOfEps, epSet->inUse); diff --git a/source/dnode/mnode/src/mondeInt.c b/source/dnode/mnode/src/mondeInt.c index 343384ba67..2b17da2475 100644 --- a/source/dnode/mnode/src/mondeInt.c +++ b/source/dnode/mnode/src/mondeInt.c @@ -54,7 +54,7 @@ int64_t mnodeGetClusterId() { return tsMint.clusterId; } EMnStatus mnodeGetStatus() { return tsMint.state; } -void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) { +void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg) { (*tsMint.fp.SendMsgToDnode)(epSet, rpcMsg); } diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index e047964b94..fa49c20c77 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -54,7 +54,7 @@ typedef struct { char secret[TSDB_KEY_LEN]; // secret for the link char ckey[TSDB_KEY_LEN]; // ciphering key - void (*cfp)(SRpcMsg *, SRpcEpSet *); + void (*cfp)(SRpcMsg *, SEpSet *); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t refCount; @@ -70,7 +70,7 @@ typedef struct { typedef struct { SRpcInfo *pRpc; // associated SRpcInfo - SRpcEpSet epSet; // ip list provided by app + SEpSet epSet; // ip list provided by app void *ahandle; // handle provided by app struct SRpcConn *pConn; // pConn allocated char msgType; // message type @@ -84,7 +84,7 @@ typedef struct { int64_t rid; // refId returned by taosAddRef SRpcMsg *pRsp; // for synchronous API tsem_t *pSem; // for synchronous API - SRpcEpSet *pSet; // for synchronous API + SEpSet *pSet; // for synchronous API char msg[0]; // RpcHead starts from here } SRpcReqContext; @@ -383,7 +383,7 @@ void *rpcReallocCont(void *ptr, int contLen) { return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); } -void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { +void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcReqContext *pContext; @@ -486,15 +486,15 @@ void rpcSendResponse(const SRpcMsg *pRsp) { return; } -void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) { +void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) { SRpcMsg rpcMsg; memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.contLen = sizeof(SRpcEpSet); + rpcMsg.contLen = sizeof(SEpSet); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); if (rpcMsg.pCont == NULL) return; - memcpy(rpcMsg.pCont, pEpSet, sizeof(SRpcEpSet)); + memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet)); rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.handle = thandle; @@ -516,7 +516,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { return 0; } -void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { +void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SRpcReqContext *pContext; pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); @@ -794,9 +794,9 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) { } static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { - SRpcConn *pConn; - SRpcInfo *pRpc = pContext->pRpc; - SRpcEpSet *pEpSet = &pContext->epSet; + SRpcConn *pConn; + SRpcInfo *pRpc = pContext->pRpc; + SEpSet *pEpSet = &pContext->epSet; pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); if ( pConn == NULL || pConn->user[0] == 0) { @@ -926,7 +926,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { SRpcReqContext *pContext = pConn->pContext; if (pHead->code == TSDB_CODE_RPC_REDIRECT) { - if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) { + if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SEpSet)) { // if EpSet is not included in the msg, treat it as NOT_READY pHead->code = TSDB_CODE_RPC_NOT_READY; } else { @@ -1126,12 +1126,12 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { pContext->pConn = NULL; if (pContext->pRsp) { // for synchronous API - memcpy(pContext->pSet, &pContext->epSet, sizeof(SRpcEpSet)); + memcpy(pContext->pSet, &pContext->epSet, sizeof(SEpSet)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); tsem_post(pContext->pSem); } else { // for asynchronous API - SRpcEpSet *pEpSet = NULL; + SEpSet *pEpSet = NULL; if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) pEpSet = &pContext->epSet; @@ -1175,7 +1175,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte if (pHead->code == TSDB_CODE_RPC_REDIRECT) { pContext->numOfTry = 0; - SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; + SEpSet *pEpSet = (SEpSet*)pHead->content; if (pEpSet->numOfEps > 0) { memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,