minor changes
This commit is contained in:
parent
68f8d6451e
commit
5f9826b93a
|
@ -103,7 +103,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;
|
||||||
|
|
|
@ -287,15 +287,17 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
|
||||||
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
|
SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
|
||||||
|
SDnodeObj *pDnode = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
|
||||||
mndParseStatusMsg(pStatus);
|
mndParseStatusMsg(pStatus);
|
||||||
|
|
||||||
SDnodeObj *pDnode = NULL;
|
|
||||||
if (pStatus->dnodeId == 0) {
|
if (pStatus->dnodeId == 0) {
|
||||||
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp);
|
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp);
|
||||||
if (pDnode == NULL) {
|
if (pDnode == NULL) {
|
||||||
mDebug("dnode:%s, not created yet", pStatus->dnodeEp);
|
mDebug("dnode:%s, not created yet", pStatus->dnodeEp);
|
||||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
return -1;
|
goto PROCESS_STATUS_MSG_OVER;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId);
|
pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId);
|
||||||
|
@ -305,9 +307,8 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
||||||
pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
|
pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
|
||||||
}
|
}
|
||||||
mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp);
|
mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
return -1;
|
goto PROCESS_STATUS_MSG_OVER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,10 +316,9 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
||||||
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
|
if (pDnode != NULL && pDnode->status != DND_STATUS_READY) {
|
||||||
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
|
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
|
||||||
}
|
}
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
|
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver);
|
||||||
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
|
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
|
||||||
return -1;
|
goto PROCESS_STATUS_MSG_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStatus->dnodeId == 0) {
|
if (pStatus->dnodeId == 0) {
|
||||||
|
@ -330,9 +330,8 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId,
|
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId,
|
||||||
pMnode->clusterId);
|
pMnode->clusterId);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID;
|
terrno != TSDB_CODE_MND_INVALID_CLUSTER_ID;
|
||||||
return -1;
|
goto PROCESS_STATUS_MSG_OVER;
|
||||||
} else {
|
} else {
|
||||||
pDnode->accessTimes++;
|
pDnode->accessTimes++;
|
||||||
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
|
mTrace("dnode:%d, status received, access times %d", pDnode->id, pDnode->accessTimes);
|
||||||
|
@ -345,9 +344,8 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
||||||
if (0 != ret) {
|
if (0 != ret) {
|
||||||
pDnode->offlineReason = ret;
|
pDnode->offlineReason = ret;
|
||||||
mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]);
|
mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
|
terrno = TSDB_CODE_MND_INVALID_CLUSTER_CFG;
|
||||||
return -1;
|
goto PROCESS_STATUS_MSG_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("dnode:%d, from offline to online", pDnode->id);
|
mInfo("dnode:%d, from offline to online", pDnode->id);
|
||||||
|
@ -363,20 +361,22 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
||||||
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);
|
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);
|
||||||
SStatusRsp *pRsp = rpcMallocCont(contLen);
|
SStatusRsp *pRsp = rpcMallocCont(contLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
goto PROCESS_STATUS_MSG_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
|
pRsp->dnodeCfg.dnodeId = htonl(pDnode->id);
|
||||||
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
|
pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId);
|
||||||
mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
|
// mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps);
|
||||||
|
|
||||||
pMsg->contLen = contLen;
|
pMsg->contLen = contLen;
|
||||||
pMsg->pCont = pRsp;
|
pMsg->pCont = pRsp;
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
|
|
||||||
return 0;
|
code = 0;
|
||||||
|
|
||||||
|
PROCESS_STATUS_MSG_OVER:
|
||||||
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) {
|
static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) {
|
||||||
|
|
Loading…
Reference in New Issue