Merge pull request #9504 from taosdata/feature/dnode3

process the status msg only once
This commit is contained in:
Shengliang Guan 2021-12-30 14:40:10 +08:00 committed by GitHub
commit 0e9987a07f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 92 deletions

View File

@ -397,7 +397,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0) { if (pMgmt->dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:% " PRId64, pCfg->dnodeId, pCfg->clusterId); dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId; pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId; pMgmt->clusterId = pCfg->clusterId;
@ -440,6 +440,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
} }
SStatusRsp *pRsp = pMsg->pCont; SStatusRsp *pRsp = pMsg->pCont;
if (pMsg->pCont != NULL && pMsg->contLen != 0) {
SDnodeCfg *pCfg = &pRsp->dnodeCfg; SDnodeCfg *pCfg = &pRsp->dnodeCfg;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
pCfg->clusterId = htobe64(pCfg->clusterId); pCfg->clusterId = htobe64(pCfg->clusterId);
@ -453,6 +454,7 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
} }
dndUpdateDnodeEps(pDnode, pDnodeEps); dndUpdateDnodeEps(pDnode, pDnodeEps);
}
pMgmt->statusSent = 0; pMgmt->statusSent = 0;
} }

View File

@ -103,7 +103,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;

View File

@ -74,13 +74,6 @@ typedef enum {
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
typedef enum {
DND_STATUS_OFFLINE = 0,
DND_STATUS_READY = 1,
DND_STATUS_CREATING = 2,
DND_STATUS_DROPPING = 3
} EDndStatus;
typedef enum { typedef enum {
DND_REASON_ONLINE = 0, DND_REASON_ONLINE = 0,
DND_REASON_STATUS_MSG_TIMEOUT, DND_REASON_STATUS_MSG_TIMEOUT,
@ -127,7 +120,6 @@ typedef struct {
int16_t numOfVnodes; int16_t numOfVnodes;
int32_t numOfSupportVnodes; int32_t numOfSupportVnodes;
int32_t numOfCores; int32_t numOfCores;
EDndStatus status;
EDndReason offlineReason; EDndReason offlineReason;
uint16_t port; uint16_t port;
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];

View File

@ -28,7 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId);
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode); SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
int32_t mndGetDnodeSize(SMnode *pMnode); int32_t mndGetDnodeSize(SMnode *pMnode);
bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode); bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -40,8 +40,6 @@ static const char *offlineReason[] = {
"unknown", "unknown",
}; };
static const char *dnodeStatus[] = {"offline", "ready", "creating", "dropping"};
static int32_t mndCreateDefaultDnode(SMnode *pMnode); static int32_t mndCreateDefaultDnode(SMnode *pMnode);
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
@ -208,10 +206,12 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
return sdbGetSize(pSdb, SDB_DNODE); return sdbGetSize(pSdb, SDB_DNODE);
} }
bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode) { bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
int64_t ms = taosGetTimestampMs(); int64_t interval = ABS(pDnode->lastAccessTime - curMs);
int64_t interval = ABS(pDnode->lastAccessTime - ms);
if (interval > 3500 * pMnode->cfg.statusInterval) { if (interval > 3500 * pMnode->cfg.statusInterval) {
if (pDnode->rebootTime > 0) {
pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
}
return false; return false;
} }
return true; return true;
@ -287,85 +287,83 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SStatusMsg *pStatus = pMsg->rpcMsg.pCont; SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
SDnodeObj *pDnode = NULL;
int32_t code = -1;
mndParseStatusMsg(pStatus); mndParseStatusMsg(pStatus);
SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp);
if (pDnode == NULL) { if (pDnode == NULL) {
mDebug("dnode:%s, not created yet", pStatus->dnodeEp); mDebug("dnode:%s, not created yet", pStatus->dnodeEp);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1; goto PROCESS_STATUS_MSG_OVER;
} }
} else { } else {
pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp);
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
} }
mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp);
mndReleaseDnode(pMnode, pDnode);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1; goto PROCESS_STATUS_MSG_OVER;
} }
} }
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
bool needCheckCfg = !(online && pDnode->rebootTime == pStatus->rebootTime);
if (needCheckCfg) {
if (pStatus->sver != pMnode->cfg.sver) { if (pStatus->sver != pMnode->cfg.sver) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
} }
mndReleaseDnode(pMnode, pDnode);
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
return -1; goto PROCESS_STATUS_MSG_OVER;
} }
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
} else { } else {
if (pStatus->clusterId != pMnode->clusterId) { if (pStatus->clusterId != pMnode->clusterId) {
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
} }
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId,
pMnode->clusterId); pMnode->clusterId);
mndReleaseDnode(pMnode, pDnode); terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID; goto PROCESS_STATUS_MSG_OVER;
return -1;
} else { } else {
pDnode->accessTimes++; pDnode->accessTimes++;
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
} }
} }
if (pDnode->status == DND_STATUS_OFFLINE) {
// Verify whether the cluster parameters are consistent when status change from offline to ready // Verify whether the cluster parameters are consistent when status change from offline to ready
int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg);
if (0 != ret) { if (0 != ret) {
pDnode->offlineReason = ret; pDnode->offlineReason = ret;
mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]);
mndReleaseDnode(pMnode, pDnode);
terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG; terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
return -1; goto PROCESS_STATUS_MSG_OVER;
} }
mInfo("dnode:%d, from offline to online", pDnode->id); mInfo("dnode:%d, from offline to online", pDnode->id);
}
pDnode->rebootTime = pStatus->rebootTime; pDnode->rebootTime = pStatus->rebootTime;
pDnode->numOfCores = pStatus->numOfCores; pDnode->numOfCores = pStatus->numOfCores;
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes;
pDnode->lastAccessTime = taosGetTimestampMs();
pDnode->status = DND_STATUS_READY;
int32_t numOfEps = mndGetDnodeSize(pMnode); int32_t numOfEps = mndGetDnodeSize(pMnode);
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);
SStatusRsp *pRsp = rpcMallocCont(contLen); SStatusRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) { if (pRsp == NULL) {
mndReleaseDnode(pMnode, pDnode);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; goto PROCESS_STATUS_MSG_OVER;
} }
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
@ -374,9 +372,14 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
pMsg->contLen = contLen; pMsg->contLen = contLen;
pMsg->pCont = pRsp; pMsg->pCont = pRsp;
mndReleaseDnode(pMnode, pDnode); }
return 0; pDnode->lastAccessTime = curMs;
code = 0;
PROCESS_STATUS_MSG_OVER:
mndReleaseDnode(pMnode, pDnode);
return code;
} }
static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) { static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) {
@ -682,10 +685,12 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
int32_t cols = 0; int32_t cols = 0;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
char *pWrite; char *pWrite;
int64_t curMs = taosGetTimestampMs();
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode); pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
cols = 0; cols = 0;
@ -706,8 +711,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
const char *status = dnodeStatus[pDnode->status]; STR_TO_VARSTR(pWrite, online ? "ready" : "offline");
STR_TO_VARSTR(pWrite, status);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
@ -715,11 +719,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pDnode->status == DND_STATUS_READY) { STR_TO_VARSTR(pWrite, online ? "" : offlineReason[pDnode->offlineReason]);
STR_TO_VARSTR(pWrite, "");
} else {
STR_TO_VARSTR(pWrite, offlineReason[pDnode->offlineReason]);
}
cols++; cols++;
numOfRows++; numOfRows++;

View File

@ -260,13 +260,14 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) {
pDnode->numOfVnodes++; pDnode->numOfVnodes++;
} }
bool isReady = mndIsDnodeInReadyStatus(pMnode, pDnode); int64_t curMs = taosGetTimestampMs();
if (isReady) { bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
if (online) {
taosArrayPush(pArray, pDnode); taosArrayPush(pArray, pDnode);
} }
mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d ready:%d", pDnode->id, numOfVnodes, mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes,
pDnode->numOfSupportVnodes, isMnode, isReady); pDnode->numOfSupportVnodes, isMnode, online);
sdbRelease(pSdb, pDnode); sdbRelease(pSdb, pDnode);
} }