[TD-9] fix errors in status message

This commit is contained in:
slguan 2020-03-19 11:33:06 +08:00
parent 7a96a2f325
commit 0e05f4ac0e
8 changed files with 181 additions and 17 deletions

View File

@ -33,7 +33,18 @@ static void *tsDnodeMClientRpc = NULL;
static SRpcIpSet tsDnodeMnodeIpList = {0}; static SRpcIpSet tsDnodeMnodeIpList = {0};
int32_t dnodeInitMClient() { int32_t dnodeInitMClient() {
dnodeReadMnodeIpList(); if (!dnodeReadMnodeIpList()) {
dTrace("failed to read mnode iplist, set it from cfg file");
memset(&tsDnodeMnodeIpList, sizeof(SRpcIpSet), 0);
tsDnodeMnodeIpList.port = tsMnodeDnodePort;
tsDnodeMnodeIpList.numOfIps = 1;
tsDnodeMnodeIpList.ip[0] = inet_addr(tsMasterIp);
if (tsSecondIp[0]) {
tsDnodeMnodeIpList.numOfIps = 2;
tsDnodeMnodeIpList.ip[1] = inet_addr(tsSecondIp);
}
}
tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
SRpcInit rpcInit; SRpcInit rpcInit;
@ -80,7 +91,7 @@ static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
static void dnodeProcessStatusRsp(SRpcMsg *pMsg) { static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, reason:%s", tstrerror(pMsg->code)); dError("status rsp is received, error:%s", tstrerror(pMsg->code));
return; return;
} }

View File

@ -175,9 +175,9 @@ static int32_t dnodeInitSystem() {
if (dnodeInitModules() != 0) return -1; if (dnodeInitModules() != 0) return -1;
if (dnodeInitRead() != 0) return -1; if (dnodeInitRead() != 0) return -1;
if (dnodeInitWrite() != 0) return -1; if (dnodeInitWrite() != 0) return -1;
if (dnodeInitMgmt() != 0) return -1;
if (dnodeInitMnode() != 0) return -1;
if (dnodeInitMClient() != 0) return -1; if (dnodeInitMClient() != 0) return -1;
if (dnodeInitMnode() != 0) return -1;
if (dnodeInitMgmt() != 0) return -1;
if (dnodeInitShell() != 0) return -1; if (dnodeInitShell() != 0) return -1;
dnodeStartModules(); dnodeStartModules();

View File

@ -448,8 +448,8 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
return; return;
} }
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
if (tsStatusTimer == NULL) { if (tsStatusTimer == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to start status timer"); dError("failed to start status timer");
return; return;
} }
@ -457,6 +457,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
SDMStatusMsg *pStatus = rpcMallocCont(contLen); SDMStatusMsg *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) { if (pStatus == NULL) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
dError("failed to malloc status message"); dError("failed to malloc status message");
return; return;
} }
@ -483,6 +484,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
}; };
dnodeSendMsgToMnode(&rpcMsg); dnodeSendMsgToMnode(&rpcMsg);
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
} }
static void dnodeReadDnodeId() { static void dnodeReadDnodeId() {

View File

@ -23,7 +23,7 @@ extern "C" {
int32_t mgmtInitBalance(); int32_t mgmtInitBalance();
void mgmtCleanupBalance(); void mgmtCleanupBalance();
void mgmtStartBalance(int32_t afterMs) ; void mgmtStartBalanceTimer(int32_t afterMs) ;
int32_t mgmtAllocVnodes(SVgObj *pVgroup); int32_t mgmtAllocVnodes(SVgObj *pVgroup);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -22,6 +22,9 @@ extern "C" {
bool mgmtCheckRedirect(void *handle); bool mgmtCheckRedirect(void *handle);
int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp);
int32_t mgmtRemoveMnode(uint32_t privateIp);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -20,7 +20,7 @@
int32_t (*mgmtInitBalanceFp)() = NULL; int32_t (*mgmtInitBalanceFp)() = NULL;
void (*mgmtCleanupBalanceFp)() = NULL; void (*mgmtCleanupBalanceFp)() = NULL;
void (*mgmtStartBalanceFp)(int32_t afterMs) = NULL; void (*mgmtStartBalanceTimerFp)(int32_t afterMs) = NULL;
int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL; int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL;
int32_t mgmtInitBalance() { int32_t mgmtInitBalance() {
@ -37,9 +37,9 @@ void mgmtCleanupBalance() {
} }
} }
void mgmtStartBalance(int32_t afterMs) { void mgmtStartBalanceTimer(int32_t afterMs) {
if (mgmtStartBalanceFp) { if (mgmtStartBalanceTimerFp) {
(*mgmtStartBalanceFp)(afterMs); (*mgmtStartBalanceTimerFp)(afterMs);
} }
} }

View File

@ -65,6 +65,8 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
pDnode->openVnodes = 0; pDnode->openVnodes = 0;
pDnode->status = TSDB_DN_STATUS_OFFLINE; pDnode->status = TSDB_DN_STATUS_OFFLINE;
mgmtUpdateDnode(pDnode);
} }
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) { bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
@ -388,6 +390,7 @@ int32_t mgmtInitDnodes() {
tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY; tsDnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY;
tsDnodeObj.status = TSDB_DN_STATUS_OFFLINE; tsDnodeObj.status = TSDB_DN_STATUS_OFFLINE;
tsDnodeObj.lastReboot = taosGetTimestampSec(); tsDnodeObj.lastReboot = taosGetTimestampSec();
sprintf(tsDnodeObj.dnodeName, "%d", tsDnodeObj.dnodeId);
mgmtSetDnodeMaxVnodes(&tsDnodeObj); mgmtSetDnodeMaxVnodes(&tsDnodeObj);
tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT); tsDnodeObj.moduleStatus |= (1 << TSDB_MOD_MGMT);
@ -523,7 +526,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
pDnode = mgmtGetDnodeByIp(pStatus->privateIp); pDnode = mgmtGetDnodeByIp(htonl(pStatus->privateIp));
if (pDnode == NULL) { if (pDnode == NULL) {
mTrace("dnode not created, privateIp:%s, name:%s, ", taosIpStr(htonl(pStatus->dnodeId)), pStatus->dnodeName); mTrace("dnode not created, privateIp:%s, name:%s, ", taosIpStr(htonl(pStatus->dnodeId)), pStatus->dnodeName);
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_DNODE_NOT_EXIST);
@ -538,8 +541,8 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
return ; return ;
} }
uint32_t lastPrivateIp = htonl(pDnode->privateIp); uint32_t lastPrivateIp = pDnode->privateIp;
uint32_t lastPublicIp = htonl(pDnode->publicIp); uint32_t lastPublicIp = pDnode->publicIp;
pDnode->privateIp = htonl(pStatus->privateIp); pDnode->privateIp = htonl(pStatus->privateIp);
pDnode->publicIp = htonl(pStatus->publicIp); pDnode->publicIp = htonl(pStatus->publicIp);
@ -550,9 +553,8 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pDnode->alternativeRole = pStatus->alternativeRole; pDnode->alternativeRole = pStatus->alternativeRole;
if (pStatus->dnodeId == 0) { if (pStatus->dnodeId == 0) {
mTrace("dnode:%d, first access, privateIp:%s, name:%s, ", pDnode->dnodeId, taosIpStr(pStatus->dnodeId), pStatus->dnodeName); mTrace("dnode:%d, first access, privateIp:%s, name:%s, ", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
mgmtSetDnodeMaxVnodes(pDnode); mgmtSetDnodeMaxVnodes(pDnode);
mgmtUpdateDnode(pDnode);
} }
if (lastPrivateIp != pDnode->privateIp || lastPublicIp != pDnode->publicIp) { if (lastPrivateIp != pDnode->privateIp || lastPublicIp != pDnode->publicIp) {
@ -578,8 +580,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
if (pDnode->status != TSDB_DN_STATUS_READY) { if (pDnode->status != TSDB_DN_STATUS_READY) {
mTrace("dnode:%d, from offline to online", pDnode->dnodeId); mTrace("dnode:%d, from offline to online", pDnode->dnodeId);
pDnode->status = TSDB_DN_STATUS_READY; pDnode->status = TSDB_DN_STATUS_READY;
//TODO: mgmtStartBalanceTimer(200);
//mgmtStartBalanceTimer(200);
} }
int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess); int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess);

View File

@ -15,9 +15,156 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "trpc.h"
#include "tschemautil.h"
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtUser.h"
int32_t (*mgmtAddMnodeFp)(uint32_t privateIp, uint32_t publicIp) = NULL;
int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL;
int32_t (*mgmtGetMnodesNumFp)() = NULL;
void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL;
static SSdbPeer tsMnodeObj = {0};
static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
bool mgmtCheckRedirect(void *handle) { bool mgmtCheckRedirect(void *handle) {
return false; return false;
} }
int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp) {
if (mgmtAddMnodeFp) {
return (*mgmtAddMnodeFp)(privateIp, publicIp);
} else {
return 0;
}
}
int32_t mgmtRemoveMnode(uint32_t privateIp) {
if (mgmtRemoveMnodeFp) {
return (*mgmtRemoveMnodeFp)(privateIp);
} else {
return 0;
}
}
static int32_t mgmtGetMnodesNum() {
if (mgmtGetMnodesNumFp) {
return (*mgmtGetMnodesNumFp)();
} else {
return 1;
}
}
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) {
if (mgmtGetNextMnodeFp) {
return (*mgmtGetNextMnodeFp)(pShow, pMnode);
} else {
if (*pMnode == NULL) {
*pMnode = NULL;
} else {
*pMnode = NULL;
}
}
return *pMnode;
}
static int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "private ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "role");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int32_t i = 1; i < cols; ++i) {
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = mgmtGetMnodesNum();
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
return 0;
}
static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
int32_t cols = 0;
SSdbPeer *pMnode = NULL;
char *pWrite;
char ipstr[32];
while (numOfRows < rows) {
pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode);
if (pMnode == NULL) break;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pMnode->ipstr);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pMnode->createdTime;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, sdbStatusStr[(uint8_t)pMnode->status]);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, sdbRoleStr[(uint8_t)pMnode->role]);
cols++;
tinet_ntoa(ipstr, pMnode->publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
numOfRows++;
}
pShow->numOfReads += numOfRows;
return numOfRows;
}