Send EP list when versions are different
This commit is contained in:
parent
93153bd276
commit
d5e8981232
|
@ -650,6 +650,7 @@ typedef struct {
|
|||
int32_t sver;
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
int64_t dver;
|
||||
int64_t rebootTime;
|
||||
int64_t updateTime;
|
||||
int32_t numOfCores;
|
||||
|
@ -682,6 +683,7 @@ typedef struct {
|
|||
} SDnodeEps;
|
||||
|
||||
typedef struct {
|
||||
int64_t dver;
|
||||
SDnodeCfg dnodeCfg;
|
||||
SDnodeEps dnodeEps;
|
||||
} SStatusRsp;
|
||||
|
|
|
@ -281,6 +281,15 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
|
|||
*/
|
||||
int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
|
||||
|
||||
/**
|
||||
* @brief Get the version of the table
|
||||
*
|
||||
* @param pSdb The sdb object.
|
||||
* @param pIter The type of the table.
|
||||
* @return int32_t The version of the table
|
||||
*/
|
||||
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
|
||||
|
||||
/**
|
||||
* @brief Update the version of sdb
|
||||
*
|
||||
|
|
|
@ -83,6 +83,7 @@ typedef struct {
|
|||
int32_t dnodeId;
|
||||
int32_t dropped;
|
||||
int64_t clusterId;
|
||||
int64_t dver;
|
||||
int64_t rebootTime;
|
||||
int64_t updateTime;
|
||||
int8_t statusSent;
|
||||
|
|
|
@ -366,6 +366,7 @@ void dndSendStatusReq(SDnode *pDnode) {
|
|||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
taosRLockLatch(&pMgmt->latch);
|
||||
pStatus->sver = htonl(pDnode->opt.sver);
|
||||
pStatus->dver = htobe64(pMgmt->dver);
|
||||
pStatus->dnodeId = htonl(pMgmt->dnodeId);
|
||||
pStatus->clusterId = htobe64(pMgmt->clusterId);
|
||||
pStatus->rebootTime = htobe64(pMgmt->rebootTime);
|
||||
|
@ -441,6 +442,8 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
|||
|
||||
SStatusRsp *pRsp = pMsg->pCont;
|
||||
if (pMsg->pCont != NULL && pMsg->contLen != 0) {
|
||||
pMgmt->dver = htobe64(pRsp->dver);
|
||||
|
||||
SDnodeCfg *pCfg = &pRsp->dnodeCfg;
|
||||
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||
pCfg->clusterId = htobe64(pCfg->clusterId);
|
||||
|
|
|
@ -45,7 +45,7 @@ static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode);
|
|||
static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode);
|
||||
static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode);
|
||||
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode);
|
||||
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew);
|
||||
|
||||
static int32_t mndProcessCreateDnodeReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq);
|
||||
|
@ -182,9 +182,9 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj *pNewDnode) {
|
||||
mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOldDnode->id, pOldDnode, pNewDnode);
|
||||
pOldDnode->updateTime = pNewDnode->updateTime;
|
||||
static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) {
|
||||
mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew);
|
||||
pOld->updateTime = pNew->updateTime;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -244,22 +244,22 @@ bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
|
||||
static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
int32_t i = 0;
|
||||
int32_t numOfEps = 0;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||
if (pIter == NULL) break;
|
||||
if (i >= numOfEps) {
|
||||
if (numOfEps >= maxEps) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pDnode);
|
||||
break;
|
||||
}
|
||||
|
||||
SDnodeEp *pEp = &pEps->eps[i];
|
||||
SDnodeEp *pEp = &pEps->eps[numOfEps];
|
||||
pEp->id = htonl(pDnode->id);
|
||||
pEp->port = htons(pDnode->port);
|
||||
memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||
|
@ -267,11 +267,11 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
|
|||
if (mndIsMnode(pMnode, pDnode->id)) {
|
||||
pEp->isMnode = 1;
|
||||
}
|
||||
i++;
|
||||
numOfEps++;
|
||||
sdbRelease(pSdb, pDnode);
|
||||
}
|
||||
|
||||
pEps->num = htonl(i);
|
||||
pEps->num = htonl(numOfEps);
|
||||
}
|
||||
|
||||
static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
|
||||
|
@ -301,6 +301,7 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
|
|||
|
||||
static void mndParseStatusMsg(SStatusReq *pStatus) {
|
||||
pStatus->sver = htonl(pStatus->sver);
|
||||
pStatus->dver = htobe64(pStatus->dver);
|
||||
pStatus->dnodeId = htonl(pStatus->dnodeId);
|
||||
pStatus->clusterId = htobe64(pStatus->clusterId);
|
||||
pStatus->rebootTime = htobe64(pStatus->rebootTime);
|
||||
|
@ -309,6 +310,14 @@ static void mndParseStatusMsg(SStatusReq *pStatus) {
|
|||
pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes);
|
||||
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
|
||||
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
|
||||
for (int32_t v = 0; v < pStatus->vnodeLoads.num; ++v) {
|
||||
SVnodeLoad *pVload = &pStatus->vnodeLoads.data[v];
|
||||
pVload->vgId = htonl(pVload->vgId);
|
||||
pVload->totalStorage = htobe64(pVload->totalStorage);
|
||||
pVload->compStorage = htobe64(pVload->compStorage);
|
||||
pVload->pointsWritten = htobe64(pVload->pointsWritten);
|
||||
pVload->tablesNum = htobe64(pVload->tablesNum);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
|
||||
|
@ -341,9 +350,11 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
|
|||
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
||||
bool needCheckCfg = !(online && pDnode->rebootTime == pStatus->rebootTime);
|
||||
bool dnodeChanged = (pStatus->dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
|
||||
bool reboot = (pDnode->rebootTime != pStatus->rebootTime);
|
||||
bool needCheck = !online || dnodeChanged || reboot;
|
||||
|
||||
if (needCheckCfg) {
|
||||
if (needCheck) {
|
||||
if (pStatus->sver != pMnode->cfg.sver) {
|
||||
if (pDnode != NULL) {
|
||||
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
|
||||
|
@ -379,7 +390,11 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
|
|||
goto PROCESS_STATUS_MSG_OVER;
|
||||
}
|
||||
|
||||
mInfo("dnode:%d, from offline to online", pDnode->id);
|
||||
if (!online) {
|
||||
mInfo("dnode:%d, from offline to online", pDnode->id);
|
||||
} else {
|
||||
mDebug("dnode:%d, send dnode eps", pDnode->id);
|
||||
}
|
||||
|
||||
pDnode->rebootTime = pStatus->rebootTime;
|
||||
pDnode->numOfCores = pStatus->numOfCores;
|
||||
|
@ -393,6 +408,7 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
|
|||
goto PROCESS_STATUS_MSG_OVER;
|
||||
}
|
||||
|
||||
pRsp->dver = htobe64(sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
|
||||
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
|
||||
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
|
||||
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
|
||||
|
|
|
@ -429,3 +429,12 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
|
|||
maxId = MAX(maxId, pSdb->maxId[type]);
|
||||
return maxId + 1;
|
||||
}
|
||||
|
||||
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type) {
|
||||
if (type >= SDB_MAX || type < 0) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_TABLE_TYPE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return pSdb->tableVer[type];
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue