diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 8a50449a61..30b069f349 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -397,7 +397,7 @@ void dndSendStatusMsg(SDnode *pDnode) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; if (pMgmt->dnodeId == 0) { - dInfo("set dnodeId:%d clusterId:% " PRId64, pCfg->dnodeId, pCfg->clusterId); + dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; pMgmt->clusterId = pCfg->clusterId; @@ -440,19 +440,21 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { } SStatusRsp *pRsp = pMsg->pCont; - SDnodeCfg *pCfg = &pRsp->dnodeCfg; - pCfg->dnodeId = htonl(pCfg->dnodeId); - pCfg->clusterId = htobe64(pCfg->clusterId); - dndUpdateDnodeCfg(pDnode, pCfg); + if (pMsg->pCont != NULL && pMsg->contLen != 0) { + SDnodeCfg *pCfg = &pRsp->dnodeCfg; + pCfg->dnodeId = htonl(pCfg->dnodeId); + pCfg->clusterId = htobe64(pCfg->clusterId); + dndUpdateDnodeCfg(pDnode, pCfg); - SDnodeEps *pDnodeEps = &pRsp->dnodeEps; - pDnodeEps->num = htonl(pDnodeEps->num); - for (int32_t i = 0; i < pDnodeEps->num; ++i) { - pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); - pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); + SDnodeEps *pDnodeEps = &pRsp->dnodeEps; + pDnodeEps->num = htonl(pDnodeEps->num); + for (int32_t i = 0; i < pDnodeEps->num; ++i) { + pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); + pDnodeEps->eps[i].port = htons(pDnodeEps->eps[i].port); + } + + dndUpdateDnodeEps(pDnode, pDnodeEps); } - - dndUpdateDnodeEps(pDnode, pDnodeEps); pMgmt->statusSent = 0; } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 2c3be4a700..e5be16937b 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -103,7 +103,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 669b97e54d..338024c20a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -74,13 +74,6 @@ typedef enum { typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; -typedef enum { - DND_STATUS_OFFLINE = 0, - DND_STATUS_READY = 1, - DND_STATUS_CREATING = 2, - DND_STATUS_DROPPING = 3 -} EDndStatus; - typedef enum { DND_REASON_ONLINE = 0, DND_REASON_STATUS_MSG_TIMEOUT, @@ -127,7 +120,6 @@ typedef struct { int16_t numOfVnodes; int32_t numOfSupportVnodes; int32_t numOfCores; - EDndStatus status; EDndReason offlineReason; uint16_t port; char fqdn[TSDB_FQDN_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndDnode.h b/source/dnode/mnode/impl/inc/mndDnode.h index 764dfbffc1..c76186c0a2 100644 --- a/source/dnode/mnode/impl/inc/mndDnode.h +++ b/source/dnode/mnode/impl/inc/mndDnode.h @@ -28,7 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId); void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); SEpSet mndGetDnodeEpset(SDnodeObj *pDnode); int32_t mndGetDnodeSize(SMnode *pMnode); -bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode); +bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d101be74cc..ca20ba61a1 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -40,8 +40,6 @@ static const char *offlineReason[] = { "unknown", }; -static const char *dnodeStatus[] = {"offline", "ready", "creating", "dropping"}; - static int32_t mndCreateDefaultDnode(SMnode *pMnode); static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode); static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw); @@ -208,10 +206,12 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { return sdbGetSize(pSdb, SDB_DNODE); } -bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode) { - int64_t ms = taosGetTimestampMs(); - int64_t interval = ABS(pDnode->lastAccessTime - ms); +bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) { + int64_t interval = ABS(pDnode->lastAccessTime - curMs); if (interval > 3500 * pMnode->cfg.statusInterval) { + if (pDnode->rebootTime > 0) { + pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT; + } return false; } return true; @@ -287,96 +287,99 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) { static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; SStatusMsg *pStatus = pMsg->rpcMsg.pCont; + SDnodeObj *pDnode = NULL; + int32_t code = -1; + mndParseStatusMsg(pStatus); - SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); if (pDnode == NULL) { mDebug("dnode:%s, not created yet", pStatus->dnodeEp); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - return -1; + goto PROCESS_STATUS_MSG_OVER; } } else { pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); if (pDnode == NULL) { pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); - if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { + if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; } mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); - mndReleaseDnode(pMnode, pDnode); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; - return -1; + goto PROCESS_STATUS_MSG_OVER; } } - if (pStatus->sver != pMnode->cfg.sver) { - if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { - pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; - } - mndReleaseDnode(pMnode, pDnode); - mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); - terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; - return -1; - } + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + bool needCheckCfg = !(online && pDnode->rebootTime == pStatus->rebootTime); - if (pStatus->dnodeId == 0) { - mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); - } else { - if (pStatus->clusterId != pMnode->clusterId) { - if (pDnode != NULL && pDnode->status != DND_STATUS_READY) { - pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; + if (needCheckCfg) { + if (pStatus->sver != pMnode->cfg.sver) { + if (pDnode != NULL) { + pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; + } + mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); + terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; + goto PROCESS_STATUS_MSG_OVER; + } + + if (pStatus->dnodeId == 0) { + mDebug("dnode:%d %s, first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); + } else { + if (pStatus->clusterId != pMnode->clusterId) { + if (pDnode != NULL) { + pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; + } + mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, + pMnode->clusterId); + terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID; + goto PROCESS_STATUS_MSG_OVER; + } else { + pDnode->accessTimes++; + mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); } - mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, - pMnode->clusterId); - mndReleaseDnode(pMnode, pDnode); - terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID; - return -1; - } else { - pDnode->accessTimes++; - mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes); } - } - if (pDnode->status == DND_STATUS_OFFLINE) { // Verify whether the cluster parameters are consistent when status change from offline to ready int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); if (0 != ret) { pDnode->offlineReason = ret; mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); - mndReleaseDnode(pMnode, pDnode); terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG; - return -1; + goto PROCESS_STATUS_MSG_OVER; } mInfo("dnode:%d, from offline to online", pDnode->id); + + pDnode->rebootTime = pStatus->rebootTime; + pDnode->numOfCores = pStatus->numOfCores; + pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; + + int32_t numOfEps = mndGetDnodeSize(pMnode); + int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); + SStatusRsp *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto PROCESS_STATUS_MSG_OVER; + } + + pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); + pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); + mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); + + pMsg->contLen = contLen; + pMsg->pCont = pRsp; } - pDnode->rebootTime = pStatus->rebootTime; - pDnode->numOfCores = pStatus->numOfCores; - pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; - pDnode->lastAccessTime = taosGetTimestampMs(); - pDnode->status = DND_STATUS_READY; + pDnode->lastAccessTime = curMs; + code = 0; - int32_t numOfEps = mndGetDnodeSize(pMnode); - int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); - SStatusRsp *pRsp = rpcMallocCont(contLen); - if (pRsp == NULL) { - mndReleaseDnode(pMnode, pDnode); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); - pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); - mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); - - pMsg->contLen = contLen; - pMsg->pCont = pRsp; +PROCESS_STATUS_MSG_OVER: mndReleaseDnode(pMnode, pDnode); - - return 0; + return code; } static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) { @@ -682,10 +685,12 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i int32_t cols = 0; SDnodeObj *pDnode = NULL; char *pWrite; + int64_t curMs = taosGetTimestampMs(); while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode); if (pShow->pIter == NULL) break; + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); cols = 0; @@ -706,8 +711,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - const char *status = dnodeStatus[pDnode->status]; - STR_TO_VARSTR(pWrite, status); + STR_TO_VARSTR(pWrite, online ? "ready" : "offline"); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -715,11 +719,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - if (pDnode->status == DND_STATUS_READY) { - STR_TO_VARSTR(pWrite, ""); - } else { - STR_TO_VARSTR(pWrite, offlineReason[pDnode->offlineReason]); - } + STR_TO_VARSTR(pWrite, online ? "" : offlineReason[pDnode->offlineReason]); cols++; numOfRows++; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 3cb5173967..1b0026cd13 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -260,13 +260,14 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) { pDnode->numOfVnodes++; } - bool isReady = mndIsDnodeInReadyStatus(pMnode, pDnode); - if (isReady) { + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); + if (online) { taosArrayPush(pArray, pDnode); } - mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d ready:%d", pDnode->id, numOfVnodes, - pDnode->numOfSupportVnodes, isMnode, isReady); + mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, numOfVnodes, + pDnode->numOfSupportVnodes, isMnode, online); sdbRelease(pSdb, pDnode); }