diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index e0d37db254..5ef2706f1e 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -47,6 +47,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt); int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); // dmMonitor.c void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 9fedaa3744..38f97d3600 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -120,6 +120,59 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { return TSDB_CODE_OPS_NOT_SUPPORT; } +static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) { + pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK; + pStatus->details[0] = 0; + + SServerStatusRsp statusRsp = {0}; + SMonMloadInfo minfo = {0}; + dmGetMnodeLoads(pMgmt, &minfo); + if (minfo.isMnode && minfo.load.syncState != TAOS_SYNC_STATE_LEADER && + minfo.load.syncState != TAOS_SYNC_STATE_CANDIDATE) { + pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; + snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState)); + return; + } + + SMonVloadInfo vinfo = {0}; + dmGetVnodeLoads(pMgmt, &vinfo); + for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) { + SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i); + if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) { + pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; + snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId, + syncStr(pLoad->syncState)); + break; + } + } + + taosArrayDestroy(vinfo.pVloads); +} + +int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { + dDebug("server run status req is received"); + SServerStatusRsp statusRsp = {0}; + dmGetServerRunStatus(pMgmt, &statusRsp); + + SRpcMsg rspMsg = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .refId = pMsg->rpcMsg.refId}; + int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp); + if (rspLen < 0) { + rspMsg.code = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + rspMsg.code = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp); + pMsg->pRsp = pRsp; + pMsg->rspLen = rspLen; + return 0; +} + SArray *dmGetMsgHandles() { int32_t code = -1; SArray *pArray = taosArrayInit(16, sizeof(SMgmtHandle)); @@ -135,6 +188,7 @@ SArray *dmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_BNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_SERVER_STATUS, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; // Requests handled by MNODE if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index ebbf6c678b..af02e56f05 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -100,10 +100,10 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) { static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SDnodeMgmt *pMgmt = pInfo->ahandle; - - int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; - dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg); + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; + bool isRequest = msgType & 1u; + dTrace("msg:%p, will be processed in dnode-mgmt queue, type:%s", pMsg, TMSG_INFO(msgType)); switch (msgType) { case TDMT_DND_CONFIG_DNODE: @@ -139,17 +139,23 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { case TDMT_DND_DROP_BNODE: code = (*pMgmt->processDropNodeFp)(pMgmt->pDnode, BNODE, pMsg); break; + case TDMT_DND_SERVER_STATUS: + code = dmProcessServerRunStatus(pMgmt, pMsg); + break; default: + terrno = TSDB_CODE_MSG_NOT_PROCESSED; break; } - if (msgType & 1u) { + if (isRequest) { if (code != 0 && terrno != 0) code = terrno; SRpcMsg rsp = { .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code, .refId = pMsg->rpcMsg.refId, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen, }; rpcSendResponse(&rsp); } diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index ff3463a776..d717408fc6 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -104,7 +104,7 @@ void dmSetEvent(SDnode *pDnode, EDndEvent event); void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc); -void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pMsg); +void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg); void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 6d71e9a36e..dbd861e6f7 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -235,44 +235,18 @@ void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const c dmReportStartup(pWrapper->pDnode, pName, pDesc); } -static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { +static void dmGetServerStartupStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { SDnodeMgmt *pMgmt = pDnode->wrappers[DNODE].pMgmt; - pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK; pStatus->details[0] = 0; if (pDnode->status == DND_STAT_INIT) { pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK; snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc); - return; - } - - if (pDnode->status == DND_STAT_STOPPED) { + } else if (pDnode->status == DND_STAT_STOPPED) { pStatus->statusCode = TSDB_SRV_STATUS_EXTING; - return; + } else { + pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK; } - - SMonMloadInfo minfo = {0}; - dmGetMnodeLoads(pMgmt, &minfo); - if (minfo.isMnode && minfo.load.syncState != TAOS_SYNC_STATE_LEADER && - minfo.load.syncState != TAOS_SYNC_STATE_CANDIDATE) { - pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; - snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState)); - return; - } - - SMonVloadInfo vinfo = {0}; - dmGetVnodeLoads(pMgmt, &vinfo); - for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) { - SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i); - if (pLoad->syncState != TAOS_SYNC_STATE_LEADER && pLoad->syncState != TAOS_SYNC_STATE_FOLLOWER) { - pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; - snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pLoad->vgId, - syncStr(pLoad->syncState)); - break; - } - } - - taosArrayDestroy(vinfo.pVloads); } void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) { @@ -288,11 +262,11 @@ void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) { rpcFreeCont(pReq->pCont); } -void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) { - dDebug("server status req is received"); +void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pReq) { + dDebug("server startup status req is received"); SServerStatusRsp statusRsp = {0}; - dmGetServerStatus(pDnode, &statusRsp); + dmGetServerStartupStatus(pDnode, &statusRsp); SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle, .refId = pReq->refId}; int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c7ce1b22f4..30ec04d9ef 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -132,9 +132,13 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { switch (msgType) { case TDMT_DND_SERVER_STATUS: - dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); - dmProcessServerStatusReq(pDnode, pMsg); - return; + if (pDnode->status != DND_STAT_RUNNING) { + dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); + dmProcessServerStartupStatus(pDnode, pMsg); + return; + } else { + break; + } case TDMT_DND_NET_TEST: dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); dmProcessNetTestReq(pDnode, pMsg);