From 0e05f4ac0e0564a88af013b9f8d777a3cd880a03 Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 19 Mar 2020 11:33:06 +0800 Subject: [PATCH] [TD-9] fix errors in status message --- src/dnode/src/dnodeMClient.c | 15 +++- src/dnode/src/dnodeMain.c | 4 +- src/dnode/src/dnodeMgmt.c | 4 +- src/mnode/inc/mgmtBalance.h | 2 +- src/mnode/inc/mgmtMnode.h | 3 + src/mnode/src/mgmtBalance.c | 8 +- src/mnode/src/mgmtDnode.c | 15 ++-- src/mnode/src/mgmtMnode.c | 147 +++++++++++++++++++++++++++++++++++ 8 files changed, 181 insertions(+), 17 deletions(-) diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c index 76bb8aa524..812cc8c870 100644 --- a/src/dnode/src/dnodeMClient.c +++ b/src/dnode/src/dnodeMClient.c @@ -33,7 +33,18 @@ static void *tsDnodeMClientRpc = NULL; static SRpcIpSet tsDnodeMnodeIpList = {0}; int32_t dnodeInitMClient() { - dnodeReadMnodeIpList(); + if (!dnodeReadMnodeIpList()) { + dTrace("failed to read mnode iplist, set it from cfg file"); + memset(&tsDnodeMnodeIpList, sizeof(SRpcIpSet), 0); + tsDnodeMnodeIpList.port = tsMnodeDnodePort; + tsDnodeMnodeIpList.numOfIps = 1; + tsDnodeMnodeIpList.ip[0] = inet_addr(tsMasterIp); + if (tsSecondIp[0]) { + tsDnodeMnodeIpList.numOfIps = 2; + tsDnodeMnodeIpList.ip[1] = inet_addr(tsSecondIp); + } + } + tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; SRpcInit rpcInit; @@ -80,7 +91,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { if (pMsg->code != TSDB_CODE_SUCCESS) { - dError("status rsp is received, reason:%s", tstrerror(pMsg->code)); + dError("status rsp is received, error:%s", tstrerror(pMsg->code)); return; } diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 65fac0155b..af10fccd01 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -175,9 +175,9 @@ static int32_t dnodeInitSystem() { if (dnodeInitModules() != 0) return -1; if (dnodeInitRead() != 0) return -1; if (dnodeInitWrite() != 0) return -1; - if (dnodeInitMgmt() != 0) return -1; - if (dnodeInitMnode() != 0) return -1; if (dnodeInitMClient() != 0) return -1; + if (dnodeInitMnode() != 0) return -1; + if (dnodeInitMgmt() != 0) return -1; if (dnodeInitShell() != 0) return -1; dnodeStartModules(); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index e1c836a0ea..fa915187ec 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -448,8 +448,8 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { return; } - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); if (tsStatusTimer == NULL) { + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); dError("failed to start status timer"); return; } @@ -457,6 +457,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); SDMStatusMsg *pStatus = rpcMallocCont(contLen); if (pStatus == NULL) { + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); dError("failed to malloc status message"); return; } @@ -483,6 +484,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { }; dnodeSendMsgToMnode(&rpcMsg); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer); } static void dnodeReadDnodeId() { diff --git a/src/mnode/inc/mgmtBalance.h b/src/mnode/inc/mgmtBalance.h index 697bc46a7e..401074d171 100644 --- a/src/mnode/inc/mgmtBalance.h +++ b/src/mnode/inc/mgmtBalance.h @@ -23,7 +23,7 @@ extern "C" { int32_t mgmtInitBalance(); void mgmtCleanupBalance(); -void mgmtStartBalance(int32_t afterMs) ; +void mgmtStartBalanceTimer(int32_t afterMs) ; int32_t mgmtAllocVnodes(SVgObj *pVgroup); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index 27256d805a..0f08664b6a 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -22,6 +22,9 @@ extern "C" { bool mgmtCheckRedirect(void *handle); +int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp); +int32_t mgmtRemoveMnode(uint32_t privateIp); + #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 116e0ef36b..cb4857fa2c 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -20,7 +20,7 @@ int32_t (*mgmtInitBalanceFp)() = NULL; void (*mgmtCleanupBalanceFp)() = NULL; -void (*mgmtStartBalanceFp)(int32_t afterMs) = NULL; +void (*mgmtStartBalanceTimerFp)(int32_t afterMs) = NULL; int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL; int32_t mgmtInitBalance() { @@ -37,9 +37,9 @@ void mgmtCleanupBalance() { } } -void mgmtStartBalance(int32_t afterMs) { - if (mgmtStartBalanceFp) { - (*mgmtStartBalanceFp)(afterMs); +void mgmtStartBalanceTimer(int32_t afterMs) { + if (mgmtStartBalanceTimerFp) { + (*mgmtStartBalanceTimerFp)(afterMs); } } diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 37106323e9..8f5c7316f1 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -65,6 +65,8 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) { pDnode->openVnodes = 0; pDnode->status = TSDB_DN_STATUS_OFFLINE; + + mgmtUpdateDnode(pDnode); } bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { @@ -388,6 +390,7 @@ int32_t mgmtInitDnodes() { tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; tsDnodeObj.status = TSDB_DN_STATUS_OFFLINE; tsDnodeObj.lastReboot = taosGetTimestampSec(); + sprintf(tsDnodeObj.dnodeName, "%d", tsDnodeObj.dnodeId); mgmtSetDnodeMaxVnodes(&tsDnodeObj); tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT); @@ -523,7 +526,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { - pDnode = mgmtGetDnodeByIp(pStatus->privateIp); + pDnode = mgmtGetDnodeByIp(htonl(pStatus->privateIp)); if (pDnode == NULL) { mTrace("dnode not created, privateIp:%s, name:%s, ", taosIpStr(htonl(pStatus->dnodeId)), pStatus->dnodeName); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); @@ -538,8 +541,8 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { return ; } - uint32_t lastPrivateIp = htonl(pDnode->privateIp); - uint32_t lastPublicIp = htonl(pDnode->publicIp); + uint32_t lastPrivateIp = pDnode->privateIp; + uint32_t lastPublicIp = pDnode->publicIp; pDnode->privateIp = htonl(pStatus->privateIp); pDnode->publicIp = htonl(pStatus->publicIp); @@ -550,9 +553,8 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { pDnode->alternativeRole = pStatus->alternativeRole; if (pStatus->dnodeId == 0) { - mTrace("dnode:%d, first access, privateIp:%s, name:%s, ", pDnode->dnodeId, taosIpStr(pStatus->dnodeId), pStatus->dnodeName); + mTrace("dnode:%d, first access, privateIp:%s, name:%s, ", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName); mgmtSetDnodeMaxVnodes(pDnode); - mgmtUpdateDnode(pDnode); } if (lastPrivateIp != pDnode->privateIp || lastPublicIp != pDnode->publicIp) { @@ -578,8 +580,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { if (pDnode->status != TSDB_DN_STATUS_READY) { mTrace("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TSDB_DN_STATUS_READY; - //TODO: - //mgmtStartBalanceTimer(200); + mgmtStartBalanceTimer(200); } int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess); diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index f53ca3f95f..00785dd0f6 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -15,9 +15,156 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "trpc.h" +#include "tschemautil.h" #include "mgmtMnode.h" +#include "mgmtUser.h" + +int32_t (*mgmtAddMnodeFp)(uint32_t privateIp, uint32_t publicIp) = NULL; +int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL; +int32_t (*mgmtGetMnodesNumFp)() = NULL; +void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; + +static SSdbPeer tsMnodeObj = {0}; +static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); bool mgmtCheckRedirect(void *handle) { return false; } +int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp) { + if (mgmtAddMnodeFp) { + return (*mgmtAddMnodeFp)(privateIp, publicIp); + } else { + return 0; + } +} + +int32_t mgmtRemoveMnode(uint32_t privateIp) { + if (mgmtRemoveMnodeFp) { + return (*mgmtRemoveMnodeFp)(privateIp); + } else { + return 0; + } +} + +static int32_t mgmtGetMnodesNum() { + if (mgmtGetMnodesNumFp) { + return (*mgmtGetMnodesNumFp)(); + } else { + return 1; + } +} + +static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { + if (mgmtGetNextMnodeFp) { + return (*mgmtGetNextMnodeFp)(pShow, pMnode); + } else { + if (*pMnode == NULL) { + *pMnode = NULL; + } else { + *pMnode = NULL; + } + } + + return *pMnode; +} + +static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + int32_t cols = 0; + + SUserObj *pUser = mgmtGetUserFromConn(pConn); + if (pUser == NULL) return 0; + + if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS; + + SSchema *pSchema = tsGetSchema(pMeta); + + pShow->bytes[cols] = 16; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "private ip"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 10; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "status"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 10; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "role"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 16; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "public ip"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = mgmtGetMnodesNum(); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + pShow->pNode = NULL; + + return 0; +} + +static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + int32_t cols = 0; + SSdbPeer *pMnode = NULL; + char *pWrite; + char ipstr[32]; + + while (numOfRows < rows) { + pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode); + if (pMnode == NULL) break; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, pMnode->ipstr); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pMnode->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, sdbStatusStr[(uint8_t)pMnode->status]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, sdbRoleStr[(uint8_t)pMnode->role]); + cols++; + + tinet_ntoa(ipstr, pMnode->publicIp); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, ipstr); + cols++; + + numOfRows++; + } + + pShow->numOfReads += numOfRows; + return numOfRows; +} + +