process the status msg only once
This commit is contained in:
parent
28359096b3
commit
62f620fa65
|
@ -397,7 +397,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
|
|||
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
|
||||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
if (pMgmt->dnodeId == 0) {
|
||||
dInfo("set dnodeId:%d clusterId:% " PRId64, pCfg->dnodeId, pCfg->clusterId);
|
||||
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
|
||||
taosWLockLatch(&pMgmt->latch);
|
||||
pMgmt->dnodeId = pCfg->dnodeId;
|
||||
pMgmt->clusterId = pCfg->clusterId;
|
||||
|
@ -440,19 +440,21 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
SStatusRsp *pRsp = pMsg->pCont;
|
||||
SDnodeCfg *pCfg = &pRsp->dnodeCfg;
|
||||
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||
pCfg->clusterId = htobe64(pCfg->clusterId);
|
||||
dndUpdateDnodeCfg(pDnode, pCfg);
|
||||
if (pMsg->pCont != NULL) {
|
||||
SDnodeCfg *pCfg = &pRsp->dnodeCfg;
|
||||
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||
pCfg->clusterId = htobe64(pCfg->clusterId);
|
||||
dndUpdateDnodeCfg(pDnode, pCfg);
|
||||
|
||||
SDnodeEps *pDnodeEps = &pRsp->dnodeEps;
|
||||
pDnodeEps->num = htonl(pDnodeEps->num);
|
||||
for (int32_t i = 0; i < pDnodeEps->num; ++i) {
|
||||
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
|
||||
pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port);
|
||||
SDnodeEps *pDnodeEps = &pRsp->dnodeEps;
|
||||
pDnodeEps->num = htonl(pDnodeEps->num);
|
||||
for (int32_t i = 0; i < pDnodeEps->num; ++i) {
|
||||
pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id);
|
||||
pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port);
|
||||
}
|
||||
|
||||
dndUpdateDnodeEps(pDnode, pDnodeEps);
|
||||
}
|
||||
|
||||
dndUpdateDnodeEps(pDnode, pDnodeEps);
|
||||
pMgmt->statusSent = 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId);
|
|||
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
|
||||
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
|
||||
int32_t mndGetDnodeSize(SMnode *pMnode);
|
||||
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode);
|
||||
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -206,9 +206,8 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
|
|||
return sdbGetSize(pSdb, SDB_DNODE);
|
||||
}
|
||||
|
||||
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode) {
|
||||
int64_t ms = taosGetTimestampMs();
|
||||
int64_t interval = ABS(pDnode->lastAccessTime - ms);
|
||||
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
|
||||
int64_t interval = ABS(pDnode->lastAccessTime - curMs);
|
||||
if (interval > 3500 * pMnode->cfg.statusInterval) {
|
||||
if (pDnode->rebootTime > 0) {
|
||||
pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
|
||||
|
@ -313,33 +312,37 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pStatus->sver != pMnode->cfg.sver) {
|
||||
if (pDnode != NULL) {
|
||||
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
|
||||
}
|
||||
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
|
||||
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
|
||||
goto PROCESS_STATUS_MSG_OVER;
|
||||
}
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
||||
bool needCheckCfg = !(online && pDnode->rebootTime == pStatus->rebootTime);
|
||||
|
||||
if (pStatus->dnodeId == 0) {
|
||||
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
|
||||
} else {
|
||||
if (pStatus->clusterId != pMnode->clusterId) {
|
||||
if (needCheckCfg) {
|
||||
if (pStatus->sver != pMnode->cfg.sver) {
|
||||
if (pDnode != NULL) {
|
||||
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
|
||||
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
|
||||
}
|
||||
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId,
|
||||
pMnode->clusterId);
|
||||
terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
|
||||
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
|
||||
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
|
||||
goto PROCESS_STATUS_MSG_OVER;
|
||||
} else {
|
||||
pDnode->accessTimes++;
|
||||
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
|
||||
}
|
||||
}
|
||||
|
||||
if (/*pDnode->status == DND_STATUS_OFFLINE*/1) {
|
||||
if (pStatus->dnodeId == 0) {
|
||||
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
|
||||
} else {
|
||||
if (pStatus->clusterId != pMnode->clusterId) {
|
||||
if (pDnode != NULL) {
|
||||
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
|
||||
}
|
||||
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId,
|
||||
pMnode->clusterId);
|
||||
terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
|
||||
goto PROCESS_STATUS_MSG_OVER;
|
||||
} else {
|
||||
pDnode->accessTimes++;
|
||||
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
|
||||
}
|
||||
}
|
||||
|
||||
// Verify whether the cluster parameters are consistent when status change from offline to ready
|
||||
int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg);
|
||||
if (0 != ret) {
|
||||
|
@ -350,28 +353,28 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
mInfo("dnode:%d, from offline to online", pDnode->id);
|
||||
|
||||
pDnode->rebootTime = pStatus->rebootTime;
|
||||
pDnode->numOfCores = pStatus->numOfCores;
|
||||
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes;
|
||||
|
||||
int32_t numOfEps = mndGetDnodeSize(pMnode);
|
||||
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);
|
||||
SStatusRsp *pRsp = rpcMallocCont(contLen);
|
||||
if (pRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto PROCESS_STATUS_MSG_OVER;
|
||||
}
|
||||
|
||||
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
|
||||
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
|
||||
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
|
||||
|
||||
pMsg->contLen = contLen;
|
||||
pMsg->pCont = pRsp;
|
||||
}
|
||||
|
||||
pDnode->rebootTime = pStatus->rebootTime;
|
||||
pDnode->numOfCores = pStatus->numOfCores;
|
||||
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes;
|
||||
pDnode->lastAccessTime = taosGetTimestampMs();
|
||||
|
||||
int32_t numOfEps = mndGetDnodeSize(pMnode);
|
||||
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);
|
||||
SStatusRsp *pRsp = rpcMallocCont(contLen);
|
||||
if (pRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto PROCESS_STATUS_MSG_OVER;
|
||||
}
|
||||
|
||||
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
|
||||
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
|
||||
// mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
|
||||
|
||||
pMsg->contLen = contLen;
|
||||
pMsg->pCont = pRsp;
|
||||
|
||||
pDnode->lastAccessTime = curMs;
|
||||
code = 0;
|
||||
|
||||
PROCESS_STATUS_MSG_OVER:
|
||||
|
@ -682,11 +685,12 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
|
|||
int32_t cols = 0;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
char *pWrite;
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode);
|
||||
if (pShow->pIter == NULL) break;
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode);
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
||||
|
||||
cols = 0;
|
||||
|
||||
|
|
|
@ -260,7 +260,8 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) {
|
|||
pDnode->numOfVnodes++;
|
||||
}
|
||||
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode);
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
||||
if (online) {
|
||||
taosArrayPush(pArray, pDnode);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue