rename SRpcEpset to SEpSet

This commit is contained in:
Shengliang Guan 2021-11-02 15:24:07 +08:00
parent 72cb9c01ea
commit 84effaaba9
18 changed files with 81 additions and 79 deletions

View File

@ -901,16 +901,7 @@ typedef struct {
typedef struct { typedef struct {
char queryId[TSDB_KILL_MSG_LEN + 1]; char queryId[TSDB_KILL_MSG_LEN + 1];
} SKillQueryMsg, SKillStreamMsg, SKillConnMsg; } SKillQueryMsg, SKillConnMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
uint64_t stime; // stream starting time
int32_t status;
char tableFname[TSDB_TABLE_FNAME_LEN];
} SAlterStreamMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];

View File

@ -22,22 +22,16 @@ extern "C" {
#include <stdbool.h> #include <stdbool.h>
#include <stdint.h> #include <stdint.h>
#include "taosdef.h" #include "taosdef.h"
#include "taosmsg.h"
#define TAOS_CONN_SERVER 0 #define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1 #define TAOS_CONN_CLIENT 1
extern int tsRpcHeadSize; extern int tsRpcHeadSize;
typedef struct SRpcEpSet {
int8_t inUse;
int8_t numOfEps;
uint16_t port[TSDB_MAX_REPLICA];
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
} SRpcEpSet;
typedef struct SRpcCorEpSet { typedef struct SRpcCorEpSet {
int32_t version; int32_t version;
SRpcEpSet epSet; SEpSet epSet;
} SRpcCorEpSet; } SRpcCorEpSet;
typedef struct SRpcConnInfo { typedef struct SRpcConnInfo {
@ -72,7 +66,7 @@ typedef struct SRpcInit {
char *ckey; // ciphering key char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg, code shall be ignored by server app
void (*cfp)(SRpcMsg *, SRpcEpSet *); void (*cfp)(SRpcMsg *, SEpSet *);
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
@ -85,11 +79,11 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen); void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen); void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);

View File

@ -29,7 +29,7 @@ typedef struct {
* @param epSet, the endpoint list of the dnodes. * @param epSet, the endpoint list of the dnodes.
* @param rpcMsg, message to be sent. * @param rpcMsg, message to be sent.
*/ */
void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void (*SendMsgToDnode)(struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
/** /**
* Send messages to mnode, such as config message. * Send messages to mnode, such as config message.

View File

@ -23,13 +23,13 @@ extern "C" {
int32_t dnodeInitDnode(); int32_t dnodeInitDnode();
void dnodeCleanupDnode(); void dnodeCleanupDnode();
void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dnodeGetDnodeId(); int32_t dnodeGetDnodeId();
int64_t dnodeGetClusterId(); int64_t dnodeGetClusterId();
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
void dnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet); void dnodeGetMnodeEpSetForPeer(SEpSet *epSet);
void dnodeGetMnodeEpSetForShell(SRpcEpSet *epSet); void dnodeGetMnodeEpSetForShell(SEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -35,7 +35,7 @@ extern int32_t dDebugFlag;
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat; typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat;
typedef void (*MsgFp)(SRpcMsg *pMsg, SRpcEpSet *pEpSet); typedef void (*MsgFp)(SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dnodeInit(); int32_t dnodeInit();
void dnodeCleanup(); void dnodeCleanup();

View File

@ -23,9 +23,8 @@ extern "C" {
int32_t dnodeInitMnode(); int32_t dnodeInitMnode();
void dnodeCleanupMnode(); void dnodeCleanupMnode();
void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -24,7 +24,7 @@ extern "C" {
int32_t dnodeInitTrans(); int32_t dnodeInitTrans();
void dnodeCleanupTrans(); void dnodeCleanupTrans();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -23,7 +23,7 @@ extern "C" {
int32_t dnodeInitVnodes(); int32_t dnodeInitVnodes();
void dnodeCleanupVnodes(); void dnodeCleanupVnodes();
void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet); void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet);
void dnodeGetVnodes(SVnodeLoads *pVloads); void dnodeGetVnodes(SVnodeLoads *pVloads);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -27,8 +27,8 @@ static struct {
int64_t clusterId; int64_t clusterId;
SDnodeEps *dnodeEps; SDnodeEps *dnodeEps;
SHashObj *dnodeHash; SHashObj *dnodeHash;
SRpcEpSet mnodeEpSetForShell; SEpSet mnodeEpSetForShell;
SRpcEpSet mnodeEpSetForPeer; SEpSet mnodeEpSetForPeer;
char file[PATH_MAX + 20]; char file[PATH_MAX + 20];
uint32_t rebootTime; uint32_t rebootTime;
int8_t dropped; int8_t dropped;
@ -67,13 +67,13 @@ void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) {
pthread_mutex_unlock(&tsDnode.mutex); pthread_mutex_unlock(&tsDnode.mutex);
} }
void dnodeGetMnodeEpSetForPeer(SRpcEpSet *pEpSet) { void dnodeGetMnodeEpSetForPeer(SEpSet *pEpSet) {
pthread_mutex_lock(&tsDnode.mutex); pthread_mutex_lock(&tsDnode.mutex);
*pEpSet = tsDnode.mnodeEpSetForPeer; *pEpSet = tsDnode.mnodeEpSetForPeer;
pthread_mutex_unlock(&tsDnode.mutex); pthread_mutex_unlock(&tsDnode.mutex);
} }
void dnodeGetMnodeEpSetForShell(SRpcEpSet *pEpSet) { void dnodeGetMnodeEpSetForShell(SEpSet *pEpSet) {
pthread_mutex_lock(&tsDnode.mutex); pthread_mutex_lock(&tsDnode.mutex);
*pEpSet = tsDnode.mnodeEpSetForShell; *pEpSet = tsDnode.mnodeEpSetForShell;
pthread_mutex_unlock(&tsDnode.mutex); pthread_mutex_unlock(&tsDnode.mutex);
@ -82,7 +82,7 @@ void dnodeGetMnodeEpSetForShell(SRpcEpSet *pEpSet) {
void dnodeSendRedirectMsg(SRpcMsg *pMsg, bool forShell) { void dnodeSendRedirectMsg(SRpcMsg *pMsg, bool forShell) {
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
SRpcEpSet epSet = {0}; SEpSet epSet = {0};
if (forShell) { if (forShell) {
dnodeGetMnodeEpSetForShell(&epSet); dnodeGetMnodeEpSetForShell(&epSet);
} else { } else {
@ -107,7 +107,7 @@ void dnodeSendRedirectMsg(SRpcMsg *pMsg, bool forShell) {
rpcSendRedirectRsp(pMsg->handle, &epSet); rpcSendRedirectRsp(pMsg->handle, &epSet);
} }
static void dnodeUpdateMnodeEpSet(SRpcEpSet *pEpSet) { static void dnodeUpdateMnodeEpSet(SEpSet *pEpSet) {
if (pEpSet == NULL || pEpSet->numOfEps <= 0) { if (pEpSet == NULL || pEpSet->numOfEps <= 0) {
dError("mnode is changed, but content is invalid, discard it"); dError("mnode is changed, but content is invalid, discard it");
return; return;
@ -528,7 +528,7 @@ void dnodeCleanupDnode() {
dInfo("dnode-dnode is cleaned up"); dInfo("dnode-dnode is cleaned up");
} }
void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { void dnodeProcessDnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
if (msgType == TSDB_MSG_TYPE_STATUS_RSP && pEpSet) { if (msgType == TSDB_MSG_TYPE_STATUS_RSP && pEpSet) {

View File

@ -61,9 +61,28 @@ void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { void dnodeProcessDropMnodeReq(SRpcMsg *pMsg) {
mnodeProcessMsg(pMsg); int32_t code = dnodeStartMnode(pMsg);
// tsDnode.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq;
// tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = dnodeProcessDropMnodeReq; SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
}
void dnodeProcessMnodeMsg(SRpcMsg *pMsg, SEpSet *pEpSet) {
switch (pMsg->msgType) {
case TSDB_MSG_TYPE_CREATE_MNODE_IN:
dnodeProcessCreateMnodeReq(pMsg);
break;
case TSDB_MSG_TYPE_DROP_MNODE_IN:
dnodeProcessDropMnodeReq(pMsg);
break;
default:
mnodeProcessMsg(pMsg);
}
}
int32_t dnodeGetUserAuthFromMnode(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
} }

View File

@ -24,7 +24,6 @@
#include "dnodeDnode.h" #include "dnodeDnode.h"
#include "dnodeMnode.h" #include "dnodeMnode.h"
#include "dnodeVnodes.h" #include "dnodeVnodes.h"
#include "mnode.h"
static struct { static struct {
void *peerRpc; void *peerRpc;
@ -119,7 +118,7 @@ static void dnodeInitMsgFp() {
tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg; tsTrans.msgFp[TSDB_MSG_TYPE_STATUS_RSP] = dnodeProcessDnodeMsg;
} }
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessPeerReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle}; SRpcMsg rspMsg = {.handle = pMsg->handle};
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
@ -183,7 +182,7 @@ static void dnodeCleanupPeerServer() {
} }
} }
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SEpSet *pEpSet) {
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
@ -237,7 +236,7 @@ static void dnodeCleanupClient() {
} }
} }
static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessShellReq(SRpcMsg *pMsg, SEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle}; SRpcMsg rspMsg = {.handle = pMsg->handle};
int32_t msgType = pMsg->msgType; int32_t msgType = pMsg->msgType;
@ -274,13 +273,13 @@ static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} }
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0}; SEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet); dnodeGetMnodeEpSetForPeer(&epSet);
rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp); rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
} }
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); int32_t code = dnodeGetUserAuthFromMnode(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code; if (code != TSDB_CODE_APP_NOT_READY) return code;
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
@ -362,10 +361,10 @@ void dnodeCleanupTrans() {
dnodeCleanupClient(); dnodeCleanupClient();
} }
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SRpcEpSet epSet = {0}; SEpSet epSet = {0};
dnodeGetMnodeEpSetForPeer(&epSet); dnodeGetMnodeEpSetForPeer(&epSet);
dnodeSendMsgToDnode(&epSet, rpcMsg); dnodeSendMsgToDnode(&epSet, rpcMsg);
} }

View File

@ -21,6 +21,6 @@ int32_t dnodeInitVnodes() { return vnodeInit(); }
void dnodeCleanupVnodes() { vnodeCleanup(); } void dnodeCleanupVnodes() { vnodeCleanup(); }
void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); } void dnodeProcessVnodesMsg(SRpcMsg *pMsg, SEpSet *pEpSet) { vnodeProcessMsg(NULL, NULL); }
void dnodeGetVnodes(SVnodeLoads *pVloads) {} void dnodeGetVnodes(SVnodeLoads *pVloads) {}

View File

@ -27,7 +27,7 @@ int32_t mnodeGetDnodeId();
int64_t mnodeGetClusterId(); int64_t mnodeGetClusterId();
EMnStatus mnodeGetStatus(); EMnStatus mnodeGetStatus();
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell); void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
void mnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); void mnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);

View File

@ -24,8 +24,8 @@ extern "C" {
int32_t mnodeInitMnode(); int32_t mnodeInitMnode();
void mnodeCleanupMnode(); void mnodeCleanupMnode();
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect); void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect);
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect); void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -20,5 +20,5 @@
int32_t mnodeInitMnode() { return 0; } int32_t mnodeInitMnode() { return 0; }
void mnodeCleanupMnode() {} void mnodeCleanupMnode() {}
void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet, bool redirect) {} void mnodeGetMnodeEpSetForPeer(SEpSet *epSet, bool redirect) {}
void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet, bool redirect) {} void mnodeGetMnodeEpSetForShell(SEpSet *epSet, bool redirect) {}

View File

@ -297,10 +297,10 @@ static void mnodeProcessWriteReq(SMnMsg *pMsg, void *unused) {
if (!mnodeIsMaster()) { if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp; SMnRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
mnodeGetMnodeEpSetForShell(epSet, true); mnodeGetMnodeEpSetForShell(epSet, true);
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SEpSet);
mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
taosMsg[msgType], epSet->numOfEps, epSet->inUse); taosMsg[msgType], epSet->numOfEps, epSet->inUse);
@ -334,14 +334,14 @@ static void mnodeProcessReadReq(SMnMsg *pMsg, void *unused) {
if (!mnodeIsMaster()) { if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp; SMnRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
if (!epSet) { if (!epSet) {
code = TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto PROCESS_READ_REQ_END; goto PROCESS_READ_REQ_END;
} }
mnodeGetMnodeEpSetForShell(epSet, true); mnodeGetMnodeEpSetForShell(epSet, true);
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SEpSet);
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType], mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType],
epSet->numOfEps, epSet->inUse); epSet->numOfEps, epSet->inUse);
@ -375,10 +375,10 @@ static void mnodeProcessPeerReq(SMnMsg *pMsg, void *unused) {
if (!mnodeIsMaster()) { if (!mnodeIsMaster()) {
SMnRsp *rpcRsp = &pMsg->rpcRsp; SMnRsp *rpcRsp = &pMsg->rpcRsp;
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); SEpSet *epSet = rpcMallocCont(sizeof(SEpSet));
mnodeGetMnodeEpSetForPeer(epSet, true); mnodeGetMnodeEpSetForPeer(epSet, true);
rpcRsp->rsp = epSet; rpcRsp->rsp = epSet;
rpcRsp->len = sizeof(SRpcEpSet); rpcRsp->len = sizeof(SEpSet);
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
taosMsg[msgType], epSet->numOfEps, epSet->inUse); taosMsg[msgType], epSet->numOfEps, epSet->inUse);

View File

@ -54,7 +54,7 @@ int64_t mnodeGetClusterId() { return tsMint.clusterId; }
EMnStatus mnodeGetStatus() { return tsMint.state; } EMnStatus mnodeGetStatus() { return tsMint.state; }
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) { void mnodeSendMsgToDnode(struct SEpSet *epSet, struct SRpcMsg *rpcMsg) {
(*tsMint.fp.SendMsgToDnode)(epSet, rpcMsg); (*tsMint.fp.SendMsgToDnode)(epSet, rpcMsg);
} }

View File

@ -54,7 +54,7 @@ typedef struct {
char secret[TSDB_KEY_LEN]; // secret for the link char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key char ckey[TSDB_KEY_LEN]; // ciphering key
void (*cfp)(SRpcMsg *, SRpcEpSet *); void (*cfp)(SRpcMsg *, SEpSet *);
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t refCount; int32_t refCount;
@ -70,7 +70,7 @@ typedef struct {
typedef struct { typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo SRpcInfo *pRpc; // associated SRpcInfo
SRpcEpSet epSet; // ip list provided by app SEpSet epSet; // ip list provided by app
void *ahandle; // handle provided by app void *ahandle; // handle provided by app
struct SRpcConn *pConn; // pConn allocated struct SRpcConn *pConn; // pConn allocated
char msgType; // message type char msgType; // message type
@ -84,7 +84,7 @@ typedef struct {
int64_t rid; // refId returned by taosAddRef int64_t rid; // refId returned by taosAddRef
SRpcMsg *pRsp; // for synchronous API SRpcMsg *pRsp; // for synchronous API
tsem_t *pSem; // for synchronous API tsem_t *pSem; // for synchronous API
SRpcEpSet *pSet; // for synchronous API SEpSet *pSet; // for synchronous API
char msg[0]; // RpcHead starts from here char msg[0]; // RpcHead starts from here
} SRpcReqContext; } SRpcReqContext;
@ -383,7 +383,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
} }
void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) { void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext; SRpcReqContext *pContext;
@ -486,15 +486,15 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
return; return;
} }
void rpcSendRedirectRsp(void *thandle, const SRpcEpSet *pEpSet) { void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg)); memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.contLen = sizeof(SRpcEpSet); rpcMsg.contLen = sizeof(SEpSet);
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
if (rpcMsg.pCont == NULL) return; if (rpcMsg.pCont == NULL) return;
memcpy(rpcMsg.pCont, pEpSet, sizeof(SRpcEpSet)); memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
rpcMsg.handle = thandle; rpcMsg.handle = thandle;
@ -516,7 +516,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
return 0; return 0;
} }
void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
SRpcReqContext *pContext; SRpcReqContext *pContext;
pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext)); pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
@ -794,9 +794,9 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv) {
} }
static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) { static SRpcConn *rpcSetupConnToServer(SRpcReqContext *pContext) {
SRpcConn *pConn; SRpcConn *pConn;
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SRpcEpSet *pEpSet = &pContext->epSet; SEpSet *pEpSet = &pContext->epSet;
pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType); pConn = rpcGetConnFromCache(pRpc->pCache, pEpSet->fqdn[pEpSet->inUse], pEpSet->port[pEpSet->inUse], pContext->connType);
if ( pConn == NULL || pConn->user[0] == 0) { if ( pConn == NULL || pConn->user[0] == 0) {
@ -926,7 +926,7 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) { if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SEpSet)) {
// if EpSet is not included in the msg, treat it as NOT_READY // if EpSet is not included in the msg, treat it as NOT_READY
pHead->code = TSDB_CODE_RPC_NOT_READY; pHead->code = TSDB_CODE_RPC_NOT_READY;
} else { } else {
@ -1126,12 +1126,12 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
pContext->pConn = NULL; pContext->pConn = NULL;
if (pContext->pRsp) { if (pContext->pRsp) {
// for synchronous API // for synchronous API
memcpy(pContext->pSet, &pContext->epSet, sizeof(SRpcEpSet)); memcpy(pContext->pSet, &pContext->epSet, sizeof(SEpSet));
memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg)); memcpy(pContext->pRsp, pMsg, sizeof(SRpcMsg));
tsem_post(pContext->pSem); tsem_post(pContext->pSem);
} else { } else {
// for asynchronous API // for asynchronous API
SRpcEpSet *pEpSet = NULL; SEpSet *pEpSet = NULL;
if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect) if (pContext->epSet.inUse != pContext->oldInUse || pContext->redirect)
pEpSet = &pContext->epSet; pEpSet = &pContext->epSet;
@ -1175,7 +1175,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
if (pHead->code == TSDB_CODE_RPC_REDIRECT) { if (pHead->code == TSDB_CODE_RPC_REDIRECT) {
pContext->numOfTry = 0; pContext->numOfTry = 0;
SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content; SEpSet *pEpSet = (SEpSet*)pHead->content;
if (pEpSet->numOfEps > 0) { if (pEpSet->numOfEps > 0) {
memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet)); memcpy(&pContext->epSet, pHead->content, sizeof(pContext->epSet));
tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps, tDebug("%s, redirect is received, numOfEps:%d inUse:%d", pConn->info, pContext->epSet.numOfEps,