tune the code
This commit is contained in:
parent
08b087ec91
commit
a0dd566489
|
@ -43,8 +43,7 @@ void * tsDnodeTmr = NULL;
|
|||
static void * tsStatusTimer = NULL;
|
||||
static uint32_t tsRebootTime;
|
||||
|
||||
static SRpcIpSet tsDMnodeIpSetForPeer = {0};
|
||||
static SRpcIpSet tsDMnodeIpSetForShell = {0};
|
||||
static SRpcIpSet tsDMnodeIpSet = {0};
|
||||
static SDMMnodeInfos tsDMnodeInfos = {0};
|
||||
static SDMDnodeCfg tsDnodeCfg = {0};
|
||||
static taos_qset tsMgmtQset = NULL;
|
||||
|
@ -80,40 +79,21 @@ int32_t dnodeInitMgmt() {
|
|||
tsRebootTime = taosGetTimestampSec();
|
||||
|
||||
if (!dnodeReadMnodeInfos()) {
|
||||
memset(&tsDMnodeIpSetForPeer, 0, sizeof(SRpcIpSet));
|
||||
memset(&tsDMnodeIpSetForShell, 0, sizeof(SRpcIpSet));
|
||||
memset(&tsDMnodeIpSet, 0, sizeof(SRpcIpSet));
|
||||
memset(&tsDMnodeInfos, 0, sizeof(SDMMnodeInfos));
|
||||
|
||||
tsDMnodeIpSetForPeer.numOfIps = 1;
|
||||
taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSetForPeer.fqdn[0], &tsDMnodeIpSetForPeer.port[0]);
|
||||
tsDMnodeIpSetForPeer.port[0] += TSDB_PORT_DNODEDNODE;
|
||||
|
||||
tsDMnodeIpSetForShell.numOfIps = 1;
|
||||
taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSetForShell.fqdn[0], &tsDMnodeIpSetForShell.port[0]);
|
||||
tsDMnodeIpSetForShell.port[0] += TSDB_PORT_DNODESHELL;
|
||||
tsDMnodeIpSet.numOfIps = 1;
|
||||
taosGetFqdnPortFromEp(tsFirst, tsDMnodeIpSet.fqdn[0], &tsDMnodeIpSet.port[0]);
|
||||
|
||||
if (strcmp(tsSecond, tsFirst) != 0) {
|
||||
tsDMnodeIpSetForPeer.numOfIps = 2;
|
||||
taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSetForPeer.fqdn[1], &tsDMnodeIpSetForPeer.port[1]);
|
||||
tsDMnodeIpSetForPeer.port[1] += TSDB_PORT_DNODEDNODE;
|
||||
|
||||
tsDMnodeIpSetForShell.numOfIps = 2;
|
||||
taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSetForShell.fqdn[1], &tsDMnodeIpSetForShell.port[1]);
|
||||
tsDMnodeIpSetForShell.port[1] += TSDB_PORT_DNODESHELL;
|
||||
tsDMnodeIpSet.numOfIps = 2;
|
||||
taosGetFqdnPortFromEp(tsSecond, tsDMnodeIpSet.fqdn[1], &tsDMnodeIpSet.port[1]);
|
||||
}
|
||||
} else {
|
||||
tsDMnodeIpSetForPeer.inUse = tsDMnodeInfos.inUse;
|
||||
tsDMnodeIpSetForPeer.numOfIps = tsDMnodeInfos.nodeNum;
|
||||
tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse;
|
||||
tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum;
|
||||
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForPeer.fqdn[i], &tsDMnodeIpSetForPeer.port[i]);
|
||||
tsDMnodeIpSetForPeer.port[i] += TSDB_PORT_DNODEDNODE;
|
||||
}
|
||||
|
||||
tsDMnodeIpSetForShell.inUse = tsDMnodeInfos.inUse;
|
||||
tsDMnodeIpSetForShell.numOfIps = tsDMnodeInfos.nodeNum;
|
||||
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForShell.fqdn[i], &tsDMnodeIpSetForShell.port[i]);
|
||||
tsDMnodeIpSetForShell.port[i] += TSDB_PORT_DNODESHELL;
|
||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,9 +173,17 @@ void dnodeDispatchToMgmtQueue(SRpcMsg *pMsg) {
|
|||
void *item;
|
||||
|
||||
item = taosAllocateQitem(sizeof(SRpcMsg));
|
||||
memcpy(item, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
taosWriteQitem(tsMgmtQueue, 1, item);
|
||||
if (item) {
|
||||
memcpy(item, pMsg, sizeof(SRpcMsg));
|
||||
taosWriteQitem(tsMgmtQueue, 1, item);
|
||||
} else {
|
||||
SRpcMsg rsp;
|
||||
rsp.handle = pMsg->handle;
|
||||
rsp.pCont = NULL;
|
||||
rsp.code = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
rpcSendResponse(&rsp);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
}
|
||||
|
||||
static void *dnodeProcessMgmtQueue(void *param) {
|
||||
|
@ -352,22 +340,26 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
void dnodeUpdateMnodeIpSetForPeer(SRpcIpSet *pIpSet) {
|
||||
dPrint("mnode IP list for peer is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse);
|
||||
dPrint("mnode IP list for is changed, numOfIps:%d inUse:%d", pIpSet->numOfIps, pIpSet->inUse);
|
||||
for (int i = 0; i < pIpSet->numOfIps; ++i) {
|
||||
pIpSet->port[i] -= TSDB_PORT_DNODEDNODE;
|
||||
dPrint("mnode index:%d %s:%u", i, pIpSet->fqdn[i], pIpSet->port[i])
|
||||
}
|
||||
|
||||
tsDMnodeIpSetForPeer = *pIpSet;
|
||||
tsDMnodeIpSet = *pIpSet;
|
||||
}
|
||||
|
||||
void dnodeGetMnodeIpSetForPeer(void *ipSetRaw) {
|
||||
SRpcIpSet *ipSet = ipSetRaw;
|
||||
*ipSet = tsDMnodeIpSetForPeer;
|
||||
*ipSet = tsDMnodeIpSet;
|
||||
|
||||
for (int i=0; i<ipSet->numOfIps; ++i)
|
||||
ipSet->port[i] += TSDB_PORT_DNODEDNODE;
|
||||
}
|
||||
|
||||
void dnodeGetMnodeIpSetForShell(void *ipSetRaw) {
|
||||
SRpcIpSet *ipSet = ipSetRaw;
|
||||
*ipSet = tsDMnodeIpSetForShell;
|
||||
*ipSet = tsDMnodeIpSet;
|
||||
}
|
||||
|
||||
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
|
||||
|
@ -417,19 +409,11 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
|
|||
dPrint("mnode index:%d, %s", tsDMnodeInfos.nodeInfos[i].nodeId, tsDMnodeInfos.nodeInfos[i].nodeEp);
|
||||
}
|
||||
|
||||
tsDMnodeIpSetForPeer.inUse = tsDMnodeInfos.inUse;
|
||||
tsDMnodeIpSetForPeer.numOfIps = tsDMnodeInfos.nodeNum;
|
||||
tsDMnodeIpSet.inUse = tsDMnodeInfos.inUse;
|
||||
tsDMnodeIpSet.numOfIps = tsDMnodeInfos.nodeNum;
|
||||
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForPeer.fqdn[i], &tsDMnodeIpSetForPeer.port[i]);
|
||||
tsDMnodeIpSetForPeer.port[i] += TSDB_PORT_DNODEDNODE;
|
||||
dPrint("mnode index:%d, for peer %s %d", i, tsDMnodeIpSetForPeer.fqdn[i], tsDMnodeIpSetForPeer.port[i]);
|
||||
}
|
||||
|
||||
tsDMnodeIpSetForShell.inUse = tsDMnodeInfos.inUse;
|
||||
tsDMnodeIpSetForShell.numOfIps = tsDMnodeInfos.nodeNum;
|
||||
for (int32_t i = 0; i < tsDMnodeInfos.nodeNum; i++) {
|
||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSetForShell.fqdn[i], &tsDMnodeIpSetForShell.port[i]);
|
||||
dPrint("mnode index:%d, for shell %s %d", i, tsDMnodeIpSetForShell.fqdn[i], tsDMnodeIpSetForShell.port[i]);
|
||||
taosGetFqdnPortFromEp(tsDMnodeInfos.nodeInfos[i].nodeEp, tsDMnodeIpSet.fqdn[i], &tsDMnodeIpSet.port[i]);
|
||||
dPrint("mnode index:%d, for peer %s %d", i, tsDMnodeIpSet.fqdn[i], tsDMnodeIpSet.port[i]);
|
||||
}
|
||||
|
||||
dnodeSaveMnodeInfos();
|
||||
|
@ -555,7 +539,7 @@ static void dnodeSaveMnodeInfos() {
|
|||
}
|
||||
|
||||
char *dnodeGetMnodeMasterEp() {
|
||||
return tsDMnodeInfos.nodeInfos[tsDMnodeIpSetForPeer.inUse].nodeEp;
|
||||
return tsDMnodeInfos.nodeInfos[tsDMnodeIpSet.inUse].nodeEp;
|
||||
}
|
||||
|
||||
void* dnodeGetMnodeInfos() {
|
||||
|
@ -602,7 +586,9 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
|
|||
.msgType = TSDB_MSG_TYPE_DM_STATUS
|
||||
};
|
||||
|
||||
dnodeSendMsgToDnode(&tsDMnodeIpSetForPeer, &rpcMsg);
|
||||
SRpcIpSet ipSet;
|
||||
dnodeGetMnodeIpSetForPeer(&ipSet);
|
||||
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
|
||||
}
|
||||
|
||||
static bool dnodeReadDnodeCfg() {
|
||||
|
|
Loading…
Reference in New Issue