Merge pull request #1675 from taosdata/feature/mpeer
[TD-148] change ip sets
This commit is contained in:
commit
31a90129a7
|
@ -465,7 +465,7 @@ extern void * tscQhandle;
|
|||
extern int tscKeepConn[];
|
||||
extern int tsInsertHeadSize;
|
||||
extern int tscNumOfThreads;
|
||||
extern SRpcIpSet tscMgmtIpList;
|
||||
extern SRpcIpSet tscMgmtIpSet;
|
||||
|
||||
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows);
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
|
||||
#define TSC_MGMT_VNODE 999
|
||||
|
||||
SRpcIpSet tscMgmtIpList;
|
||||
SRpcIpSet tscMgmtIpSet;
|
||||
SRpcIpSet tscDnodeIpSet;
|
||||
|
||||
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
|
||||
|
@ -58,30 +58,30 @@ static void tscSetDnodeIpList(SSqlObj* pSql, STableMeta* pTableMeta) {
|
|||
}
|
||||
|
||||
void tscPrintMgmtIp() {
|
||||
if (tscMgmtIpList.numOfIps <= 0) {
|
||||
tscError("invalid mgmt IP list:%d", tscMgmtIpList.numOfIps);
|
||||
if (tscMgmtIpSet.numOfIps <= 0) {
|
||||
tscError("invalid mgmt IP list:%d", tscMgmtIpSet.numOfIps);
|
||||
} else {
|
||||
for (int i = 0; i < tscMgmtIpList.numOfIps; ++i) {
|
||||
tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpList.ip[i]);
|
||||
for (int i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
|
||||
tscTrace("mgmt index:%d ip:%d", i, tscMgmtIpSet.ip[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
|
||||
tscMgmtIpList.numOfIps = pIpList->numOfIps;
|
||||
tscMgmtIpList.inUse = pIpList->inUse;
|
||||
tscMgmtIpList.port = htons(pIpList->port);
|
||||
for (int32_t i = 0; i < tscMgmtIpList.numOfIps; ++i) {
|
||||
tscMgmtIpList.ip[i] = htonl(pIpList->ip[i]);
|
||||
tscMgmtIpSet.numOfIps = pIpList->numOfIps;
|
||||
tscMgmtIpSet.inUse = pIpList->inUse;
|
||||
tscMgmtIpSet.port = htons(pIpList->port);
|
||||
for (int32_t i = 0; i < tscMgmtIpSet.numOfIps; ++i) {
|
||||
tscMgmtIpSet.ip[i] = htonl(pIpList->ip[i]);
|
||||
}
|
||||
}
|
||||
|
||||
void tscSetMgmtIpListFromEdge() {
|
||||
if (tscMgmtIpList.numOfIps != 1) {
|
||||
tscMgmtIpList.numOfIps = 1;
|
||||
tscMgmtIpList.inUse = 0;
|
||||
tscMgmtIpList.port = tsMnodeShellPort;
|
||||
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
|
||||
if (tscMgmtIpSet.numOfIps != 1) {
|
||||
tscMgmtIpSet.numOfIps = 1;
|
||||
tscMgmtIpSet.inUse = 0;
|
||||
tscMgmtIpSet.port = tsMnodeShellPort;
|
||||
tscMgmtIpSet.ip[0] = inet_addr(tsMasterIp);
|
||||
tscTrace("edge mgmt IP list:");
|
||||
tscPrintMgmtIp();
|
||||
}
|
||||
|
@ -89,7 +89,7 @@ void tscSetMgmtIpListFromEdge() {
|
|||
|
||||
void tscUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
|
||||
tscTrace("mgmt IP list is changed for ufp is called");
|
||||
tscSetMgmtIpListFromCluster(pIpSet);
|
||||
tscMgmtIpSet = *pIpSet;
|
||||
}
|
||||
|
||||
void tscSetMgmtIpList(SRpcIpSet *pIpList) {
|
||||
|
@ -114,7 +114,7 @@ void tscSetMgmtIpList(SRpcIpSet *pIpList) {
|
|||
UNUSED_FUNC
|
||||
static int32_t tscGetMgmtConnMaxRetryTimes() {
|
||||
int32_t factor = 2;
|
||||
return tscMgmtIpList.numOfIps * factor;
|
||||
return tscMgmtIpSet.numOfIps * factor;
|
||||
}
|
||||
|
||||
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||
|
@ -209,7 +209,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
|||
};
|
||||
rpcSendRequest(pVnodeConn, &pSql->ipList, &rpcMsg);
|
||||
} else {
|
||||
pSql->ipList = tscMgmtIpList;
|
||||
pSql->ipList = tscMgmtIpSet;
|
||||
pSql->ipList.port = tsMnodeShellPort;
|
||||
|
||||
tscTrace("%p msg:%s is sent to server %d", pSql, taosMsg[pSql->cmd.msgType], pSql->ipList.port);
|
||||
|
@ -430,7 +430,7 @@ int tscProcessSql(SSqlObj *pSql) {
|
|||
return pSql->res.code;
|
||||
}
|
||||
} else if (pSql->cmd.command < TSDB_SQL_LOCAL) {
|
||||
pSql->ipList = tscMgmtIpList;
|
||||
pSql->ipList = tscMgmtIpSet;
|
||||
} else { // local handler
|
||||
return (*tscProcessMsgRsp[pCmd->command])(pSql);
|
||||
}
|
||||
|
|
|
@ -72,23 +72,23 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
|||
}
|
||||
|
||||
if (ip && ip[0]) {
|
||||
tscMgmtIpList.inUse = 0;
|
||||
tscMgmtIpList.port = tsMnodeShellPort;
|
||||
tscMgmtIpList.numOfIps = 1;
|
||||
tscMgmtIpList.ip[0] = inet_addr(ip);
|
||||
tscMgmtIpSet.inUse = 0;
|
||||
tscMgmtIpSet.port = tsMnodeShellPort;
|
||||
tscMgmtIpSet.numOfIps = 1;
|
||||
tscMgmtIpSet.ip[0] = inet_addr(ip);
|
||||
|
||||
if (tsMasterIp[0] && strcmp(ip, tsMasterIp) != 0) {
|
||||
tscMgmtIpList.numOfIps = 2;
|
||||
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
|
||||
tscMgmtIpSet.numOfIps = 2;
|
||||
tscMgmtIpSet.ip[1] = inet_addr(tsMasterIp);
|
||||
}
|
||||
|
||||
if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
|
||||
tscMgmtIpList.numOfIps = 3;
|
||||
tscMgmtIpList.ip[2] = inet_addr(tsSecondIp);
|
||||
tscMgmtIpSet.numOfIps = 3;
|
||||
tscMgmtIpSet.ip[2] = inet_addr(tsSecondIp);
|
||||
}
|
||||
}
|
||||
|
||||
tscMgmtIpList.port = port ? port : tsMnodeShellPort;
|
||||
tscMgmtIpSet.port = port ? port : tsMnodeShellPort;
|
||||
|
||||
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
|
||||
if (NULL == pObj) {
|
||||
|
|
|
@ -147,14 +147,14 @@ void taos_init_imp() {
|
|||
taosInitNote(tsNumOfLogLines / 10, 1, (char*)"tsc_note");
|
||||
}
|
||||
|
||||
tscMgmtIpList.inUse = 0;
|
||||
tscMgmtIpList.port = tsMnodeShellPort;
|
||||
tscMgmtIpList.numOfIps = 1;
|
||||
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
|
||||
tscMgmtIpSet.inUse = 0;
|
||||
tscMgmtIpSet.port = tsMnodeShellPort;
|
||||
tscMgmtIpSet.numOfIps = 1;
|
||||
tscMgmtIpSet.ip[0] = inet_addr(tsMasterIp);
|
||||
|
||||
if (tsSecondIp[0] && strcmp(tsSecondIp, tsMasterIp) != 0) {
|
||||
tscMgmtIpList.numOfIps = 2;
|
||||
tscMgmtIpList.ip[1] = inet_addr(tsSecondIp);
|
||||
tscMgmtIpSet.numOfIps = 2;
|
||||
tscMgmtIpSet.ip[1] = inet_addr(tsSecondIp);
|
||||
}
|
||||
|
||||
tscInitMsgsFp();
|
||||
|
|
|
@ -54,6 +54,11 @@ static SRpcIpSet tsMnodeIpSet = {0};
|
|||
static SDMMnodeInfos tsMnodeInfos = {0};
|
||||
static SDMDnodeCfg tsDnodeCfg = {0};
|
||||
|
||||
void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
|
||||
dTrace("mgmt IP list is changed for ufp is called");
|
||||
tsMnodeIpSet = *pIpSet;
|
||||
}
|
||||
|
||||
int32_t dnodeInitMClient() {
|
||||
dnodeReadDnodeCfg();
|
||||
tsRebootTime = taosGetTimestampSec();
|
||||
|
@ -90,6 +95,7 @@ int32_t dnodeInitMClient() {
|
|||
rpcInit.label = "DND-MC";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = dnodeProcessRspFromMnode;
|
||||
rpcInit.ufp = dnodeUpdateIpSet;
|
||||
rpcInit.sessions = 100;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 2000;
|
||||
|
|
|
@ -40,7 +40,7 @@ void * mgmtGetNextMnode(void *pNode, struct SMnodeObj **pMnode);
|
|||
void mgmtReleaseMnode(struct SMnodeObj *pMnode);
|
||||
|
||||
char * mgmtGetMnodeRoleStr();
|
||||
void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp);
|
||||
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet, bool usePublicIp);
|
||||
void mgmtGetMnodeInfos(void *mnodes);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -28,8 +28,10 @@
|
|||
#include "mgmtLog.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDServer.h"
|
||||
#include "mgmtMnode.h"
|
||||
#include "mgmtProfile.h"
|
||||
#include "mgmtShell.h"
|
||||
#include "mgmtSdb.h"
|
||||
#include "mgmtTable.h"
|
||||
#include "mgmtVgroup.h"
|
||||
|
||||
|
@ -99,6 +101,18 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
|
|||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!sdbIsMaster()) {
|
||||
SRpcConnInfo connInfo;
|
||||
rpcGetConnInfo(rpcMsg->handle, &connInfo);
|
||||
bool usePublicIp = false;
|
||||
|
||||
SRpcIpSet ipSet = {0};
|
||||
mgmtGetMnodeIpSet(&ipSet, usePublicIp);
|
||||
mTrace("conn from dnode ip:%s redirect msg", taosIpStr(connInfo.clientIp));
|
||||
rpcSendRedirectRsp(rpcMsg->handle, &ipSet);
|
||||
return;
|
||||
}
|
||||
|
||||
if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) {
|
||||
SRpcMsg *pMsg = malloc(sizeof(SRpcMsg));
|
||||
|
|
|
@ -171,7 +171,7 @@ char *mgmtGetMnodeRoleStr(int32_t role) {
|
|||
}
|
||||
}
|
||||
|
||||
void mgmtGetMnodeIpList(SRpcIpSet *ipSet, bool usePublicIp) {
|
||||
void mgmtGetMnodeIpSet(SRpcIpSet *ipSet, bool usePublicIp) {
|
||||
void *pNode = NULL;
|
||||
while (1) {
|
||||
SMnodeObj *pMnode = NULL;
|
||||
|
|
|
@ -141,6 +141,7 @@ void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) {
|
|||
|
||||
static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
|
||||
if (rpcMsg == NULL || rpcMsg->pCont == NULL) {
|
||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -150,7 +151,7 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
|
|||
bool usePublicIp = (connInfo.serverIp == tsPublicIpInt);
|
||||
|
||||
SRpcIpSet ipSet = {0};
|
||||
mgmtGetMnodeIpList(&ipSet, usePublicIp);
|
||||
mgmtGetMnodeIpSet(&ipSet, usePublicIp);
|
||||
mTrace("conn from ip:%s user:%s redirect msg", taosIpStr(connInfo.clientIp), connInfo.user);
|
||||
rpcSendRedirectRsp(rpcMsg->handle, &ipSet);
|
||||
return;
|
||||
|
@ -337,7 +338,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *pMsg) {
|
|||
return;
|
||||
}
|
||||
|
||||
mgmtGetMnodeIpList(&pHBRsp->ipList, pMsg->usePublicIp);
|
||||
mgmtGetMnodeIpSet(&pHBRsp->ipList, pMsg->usePublicIp);
|
||||
|
||||
/*
|
||||
* TODO
|
||||
|
@ -424,7 +425,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
|
|||
pConnectRsp->writeAuth = pUser->writeAuth;
|
||||
pConnectRsp->superAuth = pUser->superAuth;
|
||||
|
||||
mgmtGetMnodeIpList(&pConnectRsp->ipList, pMsg->usePublicIp);
|
||||
mgmtGetMnodeIpSet(&pConnectRsp->ipList, pMsg->usePublicIp);
|
||||
|
||||
connect_over:
|
||||
rpcRsp.code = code;
|
||||
|
|
|
@ -11,7 +11,8 @@ set +e
|
|||
FILE_NAME=
|
||||
RELEASE=0
|
||||
ASYNC=0
|
||||
while getopts "f:a" arg
|
||||
VALGRIND=0
|
||||
while getopts "f:av" arg
|
||||
do
|
||||
case $arg in
|
||||
f)
|
||||
|
@ -20,6 +21,9 @@ do
|
|||
a)
|
||||
ASYNC=1
|
||||
;;
|
||||
v)
|
||||
VALGRIND=1
|
||||
;;
|
||||
?)
|
||||
echo "unknow argument"
|
||||
;;
|
||||
|
@ -96,10 +100,14 @@ ulimit -c unlimited
|
|||
#sudo sysctl -w kernel.core_pattern=$TOP_DIR/core.%p.%e
|
||||
|
||||
if [ -n "$FILE_NAME" ]; then
|
||||
echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f $FILE_NAME
|
||||
echo "------------------------------------------------------------------------"
|
||||
#valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME
|
||||
$PROGRAM -c $CFG_DIR -f $FILE_NAME
|
||||
if [ $VALGRIND -eq 1 ]; then
|
||||
echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${CODE_DIR}/../script/valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME
|
||||
valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${CODE_DIR}/../script/valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME
|
||||
else
|
||||
echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f $FILE_NAME
|
||||
$PROGRAM -c $CFG_DIR -f $FILE_NAME
|
||||
fi
|
||||
else
|
||||
echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f basicSuite.sim
|
||||
echo "------------------------------------------------------------------------"
|
||||
|
|
|
@ -109,3 +109,7 @@ endi
|
|||
if $data3_3 != null then
|
||||
goto show7
|
||||
endi
|
||||
|
||||
system sh/exec_up.sh -n dnode1 -s stop
|
||||
system sh/exec_up.sh -n dnode2 -s stop
|
||||
system sh/exec_up.sh -n dnode3 -s stop
|
|
@ -77,4 +77,8 @@ if $data3_1 != master then
|
|||
endi
|
||||
if $data3_2 != slave then
|
||||
goto step5
|
||||
endi
|
||||
endi
|
||||
|
||||
system sh/exec_up.sh -n dnode1 -s stop
|
||||
system sh/exec_up.sh -n dnode2 -s stop
|
||||
system sh/exec_up.sh -n dnode3 -s stop
|
|
@ -93,3 +93,7 @@ endi
|
|||
if $dnode3Role != slave then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec_up.sh -n dnode1 -s stop
|
||||
system sh/exec_up.sh -n dnode2 -s stop
|
||||
system sh/exec_up.sh -n dnode3 -s stop
|
|
@ -123,3 +123,6 @@ if $dnode3Role != slave then
|
|||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec_up.sh -n dnode1 -s stop
|
||||
system sh/exec_up.sh -n dnode2 -s stop
|
||||
system sh/exec_up.sh -n dnode3 -s stop
|
|
@ -83,3 +83,7 @@ endi
|
|||
if $dnode3Role != null then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec_up.sh -n dnode1 -s stop
|
||||
system sh/exec_up.sh -n dnode2 -s stop
|
||||
system sh/exec_up.sh -n dnode3 -s stop
|
|
@ -38,5 +38,6 @@ if $data4_2 != 4 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
|
||||
|
||||
system sh/exec_up.sh -n dnode1 -s stop
|
||||
system sh/exec_up.sh -n dnode2 -s stop
|
||||
system sh/exec_up.sh -n dnode3 -s stop
|
|
@ -7,4 +7,3 @@ run unique/mnode/mgmt33.sim
|
|||
run unique/mnode/mgmt34.sim
|
||||
run unique/mnode/mgmtr2.sim
|
||||
run unique/mnode/secondIp.sim
|
||||
|
||||
|
|
Loading…
Reference in New Issue