change lock type in dndDnode
This commit is contained in:
parent
c049678ade
commit
c3a25985a7
|
@ -18,32 +18,27 @@
|
||||||
#include "dndTransport.h"
|
#include "dndTransport.h"
|
||||||
#include "dndVnodes.h"
|
#include "dndVnodes.h"
|
||||||
|
|
||||||
static inline void dndRLockDnode(SDnode *pDnode) { taosRLockLatch(&pDnode->dmgmt.latch); }
|
|
||||||
|
|
||||||
static inline void dndRUnLockDnode(SDnode *pDnode) { taosRUnLockLatch(&pDnode->dmgmt.latch); }
|
|
||||||
|
|
||||||
static inline void dndWLockDnode(SDnode *pDnode) { taosWLockLatch(&pDnode->dmgmt.latch); }
|
|
||||||
|
|
||||||
static inline void dndWUnLockDnode(SDnode *pDnode) { taosWUnLockLatch(&pDnode->dmgmt.latch); }
|
|
||||||
|
|
||||||
int32_t dndGetDnodeId(SDnode *pDnode) {
|
int32_t dndGetDnodeId(SDnode *pDnode) {
|
||||||
dndRLockDnode(pDnode);
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
int32_t dnodeId = pDnode->dmgmt.dnodeId;
|
taosRLockLatch(&pMgmt->latch);
|
||||||
dndRUnLockDnode(pDnode);
|
int32_t dnodeId = pMgmt->dnodeId;
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
return dnodeId;
|
return dnodeId;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t dndGetClusterId(SDnode *pDnode) {
|
int64_t dndGetClusterId(SDnode *pDnode) {
|
||||||
dndRLockDnode(pDnode);
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
int64_t clusterId = pDnode->dmgmt.clusterId;
|
taosRLockLatch(&pMgmt->latch);
|
||||||
dndRUnLockDnode(pDnode);
|
int64_t clusterId = pMgmt->clusterId;
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
return clusterId;
|
return clusterId;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
|
void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) {
|
||||||
dndRLockDnode(pDnode);
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
SDnodeEp *pDnodeEp = taosHashGet(pDnode->dmgmt.dnodeHash, &dnodeId, sizeof(int32_t));
|
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
|
||||||
if (pDnodeEp != NULL) {
|
if (pDnodeEp != NULL) {
|
||||||
if (pPort != NULL) {
|
if (pPort != NULL) {
|
||||||
*pPort = pDnodeEp->port;
|
*pPort = pDnodeEp->port;
|
||||||
|
@ -56,13 +51,14 @@ void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dndRUnLockDnode(pDnode);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
|
void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
|
||||||
dndRLockDnode(pDnode);
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
*pEpSet = pDnode->dmgmt.mnodeEpSet;
|
taosRLockLatch(&pMgmt->latch);
|
||||||
dndRUnLockDnode(pDnode);
|
*pEpSet = pMgmt->mnodeEpSet;
|
||||||
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
|
void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
|
@ -87,14 +83,15 @@ void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
|
static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
|
||||||
dInfo("mnode is changed, num:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
|
dInfo("mnode is changed, num:%d inUse:%d", pEpSet->numOfEps, pEpSet->inUse);
|
||||||
|
|
||||||
dndWLockDnode(pDnode);
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
|
taosWLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
pDnode->dmgmt.mnodeEpSet = *pEpSet;
|
pMgmt->mnodeEpSet = *pEpSet;
|
||||||
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
|
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
|
||||||
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
|
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
dndWUnLockDnode(pDnode);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndPrintDnodes(SDnode *pDnode) {
|
static void dndPrintDnodes(SDnode *pDnode) {
|
||||||
|
@ -145,16 +142,18 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) {
|
||||||
|
|
||||||
static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) {
|
static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) {
|
||||||
bool changed = false;
|
bool changed = false;
|
||||||
dndRLockDnode(pDnode);
|
|
||||||
|
|
||||||
SDnodeEp *pDnodeEp = taosHashGet(pDnode->dmgmt.dnodeHash, &dnodeId, sizeof(int32_t));
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
|
SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t));
|
||||||
if (pDnodeEp != NULL) {
|
if (pDnodeEp != NULL) {
|
||||||
char epstr[TSDB_EP_LEN + 1];
|
char epstr[TSDB_EP_LEN + 1];
|
||||||
snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port);
|
snprintf(epstr, TSDB_EP_LEN, "%s:%u", pDnodeEp->fqdn, pDnodeEp->port);
|
||||||
changed = strcmp(pEp, epstr) != 0;
|
changed = strcmp(pEp, epstr) != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndRUnLockDnode(pDnode);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
return changed;
|
return changed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,11 +341,14 @@ static void dndSendStatusMsg(SDnode *pDnode) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndRLockDnode(pDnode);
|
bool changed = false;
|
||||||
|
|
||||||
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
|
taosRLockLatch(&pMgmt->latch);
|
||||||
pStatus->sversion = htonl(pDnode->opt.sver);
|
pStatus->sversion = htonl(pDnode->opt.sver);
|
||||||
pStatus->dnodeId = htonl(pDnode->dmgmt.dnodeId);
|
pStatus->dnodeId = htonl(pMgmt->dnodeId);
|
||||||
pStatus->clusterId = htobe64(pDnode->dmgmt.clusterId);
|
pStatus->clusterId = htobe64(pMgmt->clusterId);
|
||||||
pStatus->rebootTime = htonl(pDnode->dmgmt.rebootTime);
|
pStatus->rebootTime = htonl(pMgmt->rebootTime);
|
||||||
pStatus->numOfCores = htonl(pDnode->opt.numOfCores);
|
pStatus->numOfCores = htonl(pDnode->opt.numOfCores);
|
||||||
tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
|
tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
|
||||||
pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
|
pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
|
||||||
|
@ -356,7 +358,7 @@ static void dndSendStatusMsg(SDnode *pDnode) {
|
||||||
pStatus->clusterCfg.checkTime = 0;
|
pStatus->clusterCfg.checkTime = 0;
|
||||||
char timestr[32] = "1970-01-01 00:00:00.00";
|
char timestr[32] = "1970-01-01 00:00:00.00";
|
||||||
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
|
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
|
||||||
dndRUnLockDnode(pDnode);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);
|
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);
|
||||||
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
|
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
|
||||||
|
@ -370,32 +372,33 @@ static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
|
||||||
if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) {
|
if (pMgmt->dnodeId == 0 || pMgmt->dropped != pCfg->dropped) {
|
||||||
dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped);
|
dInfo("set dnodeId:%d clusterId:%" PRId64 " dropped:%d", pCfg->dnodeId, pCfg->clusterId, pCfg->dropped);
|
||||||
|
|
||||||
dndWLockDnode(pDnode);
|
taosWLockLatch(&pMgmt->latch);
|
||||||
pMgmt->dnodeId = pCfg->dnodeId;
|
pMgmt->dnodeId = pCfg->dnodeId;
|
||||||
pMgmt->clusterId = pCfg->clusterId;
|
pMgmt->clusterId = pCfg->clusterId;
|
||||||
pMgmt->dropped = pCfg->dropped;
|
pMgmt->dropped = pCfg->dropped;
|
||||||
(void)dndWriteDnodes(pDnode);
|
(void)dndWriteDnodes(pDnode);
|
||||||
dndWUnLockDnode(pDnode);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
|
static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) {
|
||||||
if (pDnodeEps == NULL || pDnodeEps->num <= 0) return;
|
if (pDnodeEps == NULL || pDnodeEps->num <= 0) return;
|
||||||
|
|
||||||
dndWLockDnode(pDnode);
|
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||||
|
taosWLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
if (pDnodeEps->num != pDnode->dmgmt.dnodeEps->num) {
|
if (pDnodeEps->num != pMgmt->dnodeEps->num) {
|
||||||
dndResetDnodes(pDnode, pDnodeEps);
|
dndResetDnodes(pDnode, pDnodeEps);
|
||||||
dndWriteDnodes(pDnode);
|
dndWriteDnodes(pDnode);
|
||||||
} else {
|
} else {
|
||||||
int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps);
|
int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps);
|
||||||
if (memcmp(pDnode->dmgmt.dnodeEps, pDnodeEps, size) != 0) {
|
if (memcmp(pMgmt->dnodeEps, pDnodeEps, size) != 0) {
|
||||||
dndResetDnodes(pDnode, pDnodeEps);
|
dndResetDnodes(pDnode, pDnodeEps);
|
||||||
dndWriteDnodes(pDnode);
|
dndWriteDnodes(pDnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dndWUnLockDnode(pDnode);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
@ -512,7 +515,7 @@ void dndCleanupDnode(SDnode *pDnode) {
|
||||||
pMgmt->threadId = NULL;
|
pMgmt->threadId = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndWLockDnode(pDnode);
|
taosWLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
if (pMgmt->dnodeEps != NULL) {
|
if (pMgmt->dnodeEps != NULL) {
|
||||||
free(pMgmt->dnodeEps);
|
free(pMgmt->dnodeEps);
|
||||||
|
@ -529,7 +532,7 @@ void dndCleanupDnode(SDnode *pDnode) {
|
||||||
pMgmt->file = NULL;
|
pMgmt->file = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
dndWUnLockDnode(pDnode);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
dInfo("dnode-dnode is cleaned up");
|
dInfo("dnode-dnode is cleaned up");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -187,6 +187,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
|
||||||
int32_t msgType = pMsg->msgType;
|
int32_t msgType = pMsg->msgType;
|
||||||
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
|
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
|
||||||
|
dTrace("RPC %p, network test req will be processed", pMsg->handle);
|
||||||
dndProcessDnodeReq(pDnode, pMsg, pEpSet);
|
dndProcessDnodeReq(pDnode, pMsg, pEpSet);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -206,6 +207,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
|
dTrace("RPC %p, req:%s not processed since content is null", pMsg->handle, taosMsg[msgType]);
|
||||||
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
|
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
|
||||||
rpcSendResponse(&rspMsg);
|
rpcSendResponse(&rspMsg);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
|
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
|
||||||
|
|
||||||
void dndSetStat(SDnode *pDnode, EStat stat) {
|
void dndSetStat(SDnode *pDnode, EStat stat) {
|
||||||
dDebug("dnode stat set from %s to %s", dndStatStr(pDnode->stat), dndStatStr(stat));
|
dDebug("dnode status set from %s to %s", dndStatStr(pDnode->stat), dndStatStr(stat));
|
||||||
pDnode->stat = stat;
|
pDnode->stat = stat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,8 +214,7 @@ void dndCleanup(SDnode *pDnode) {
|
||||||
dndCleanupDnode(pDnode);
|
dndCleanupDnode(pDnode);
|
||||||
walCleanUp();
|
walCleanUp();
|
||||||
rpcCleanup();
|
rpcCleanup();
|
||||||
|
|
||||||
dInfo("TDengine is cleaned up successfully");
|
|
||||||
dndCleanupEnv(pDnode);
|
dndCleanupEnv(pDnode);
|
||||||
free(pDnode);
|
free(pDnode);
|
||||||
|
dInfo("TDengine is cleaned up successfully");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue