Merge pull request #4418 from taosdata/feature/wal
[TD-2270]<fix>: race condition in balance module while scheduled by timer
This commit is contained in:
commit
0c971b9a91
|
@ -236,7 +236,7 @@
|
|||
# httpDebugFlag 131
|
||||
|
||||
# debug flag for monitor
|
||||
# monitorDebugFlag 131
|
||||
# monDebugFlag 131
|
||||
|
||||
# debug flag for query
|
||||
# qDebugflag 131
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_BALANCE_INT_H
|
||||
#define TDENGINE_BALANCE_INT_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "mnodeInt.h"
|
||||
#include "mnodeDef.h"
|
||||
#include "mnodeDnode.h"
|
||||
|
||||
typedef struct {
|
||||
int32_t size;
|
||||
int32_t maxSize;
|
||||
SDnodeObj **list;
|
||||
} SBnDnodes;
|
||||
|
||||
typedef struct {
|
||||
void * timer;
|
||||
bool stop;
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond;
|
||||
pthread_t thread;
|
||||
} SBnThread;
|
||||
|
||||
typedef struct {
|
||||
pthread_mutex_t mutex;
|
||||
} SBnMgmt;
|
||||
|
||||
int32_t bnInit();
|
||||
void bnCleanUp();
|
||||
bool bnStart();
|
||||
void bnCheckStatus();
|
||||
void bnCheckModules();
|
||||
|
||||
extern SBnDnodes tsBnDnodes;
|
||||
extern void *tsMnodeTmr;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_BALANCE_SCORE_H
|
||||
#define TDENGINE_BALANCE_SCORE_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "bnInt.h"
|
||||
|
||||
void bnInitDnodes();
|
||||
void bnCleanupDnodes();
|
||||
void bnAccquireDnodes();
|
||||
void bnReleaseDnodes();
|
||||
float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extraVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_BALANCE_THREAD_H
|
||||
#define TDENGINE_BALANCE_THREAD_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "bnInt.h"
|
||||
|
||||
int32_t bnInitThread();
|
||||
void bnCleanupThread();
|
||||
void bnNotify();
|
||||
void bnStartTimer(int64_t mseconds);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -15,17 +15,12 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tutil.h"
|
||||
#include "tbalance.h"
|
||||
#include "tsync.h"
|
||||
#include "ttimer.h"
|
||||
#include "tglobal.h"
|
||||
#include "tdataformat.h"
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "mnodeDef.h"
|
||||
#include "mnodeInt.h"
|
||||
#include "mnodeDnode.h"
|
||||
#include "bnInt.h"
|
||||
#include "bnScore.h"
|
||||
#include "bnThread.h"
|
||||
#include "mnodeDb.h"
|
||||
#include "mnodeMnode.h"
|
||||
#include "mnodeSdb.h"
|
||||
|
@ -33,36 +28,18 @@
|
|||
#include "mnodeUser.h"
|
||||
#include "mnodeVgroup.h"
|
||||
|
||||
/*
|
||||
* once sdb work as mater, then tsAccessSquence reset to zero
|
||||
* increase tsAccessSquence every balance interval
|
||||
*/
|
||||
extern void * tsMnodeTmr;
|
||||
static void * tsBalanceTimer = NULL;
|
||||
static int32_t tsBalanceDnodeListSize = 0;
|
||||
static SDnodeObj ** tsBalanceDnodeList = NULL;
|
||||
static int32_t tsBalanceDnodeListMallocSize = 16;
|
||||
static pthread_mutex_t tsBalanceMutex;
|
||||
static SBnMgmt tsBnMgmt;;
|
||||
static void bnMonitorDnodeModule();
|
||||
|
||||
static void balanceStartTimer(int64_t mseconds);
|
||||
static void balanceInitDnodeList();
|
||||
static void balanceCleanupDnodeList();
|
||||
static void balanceAccquireDnodeList();
|
||||
static void balanceReleaseDnodeList();
|
||||
static void balanceMonitorDnodeModule();
|
||||
static float balanceTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extraVnode);
|
||||
static int32_t balanceGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||
static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
static void balanceLock() {
|
||||
pthread_mutex_lock(&tsBalanceMutex);
|
||||
static void bnLock() {
|
||||
pthread_mutex_lock(&tsBnMgmt.mutex);
|
||||
}
|
||||
|
||||
static void balanceUnLock() {
|
||||
pthread_mutex_unlock(&tsBalanceMutex);
|
||||
static void bnUnLock() {
|
||||
pthread_mutex_unlock(&tsBnMgmt.mutex);
|
||||
}
|
||||
|
||||
static bool balanceCheckFree(SDnodeObj *pDnode) {
|
||||
static bool bnCheckFree(SDnodeObj *pDnode) {
|
||||
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) {
|
||||
mError("dnode:%d, status:%s not available", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status));
|
||||
return false;
|
||||
|
@ -86,7 +63,7 @@ static bool balanceCheckFree(SDnodeObj *pDnode) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void balanceDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) {
|
||||
static void bnDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) {
|
||||
mDebug("vgId:%d, dnode:%d is dropping", pVgroup->vgId, pVnodeGid->dnodeId);
|
||||
|
||||
SDnodeObj *pDnode = mnodeGetDnode(pVnodeGid->dnodeId);
|
||||
|
@ -111,27 +88,26 @@ static void balanceDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) {
|
|||
mnodeUpdateVgroup(pVgroup);
|
||||
}
|
||||
|
||||
static void balanceSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) {
|
||||
static void bnSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) {
|
||||
// SVnodeGid tmp = *pVnodeGid1;
|
||||
// *pVnodeGid1 = *pVnodeGid2;
|
||||
// *pVnodeGid2 = tmp;
|
||||
}
|
||||
|
||||
int32_t balanceAllocVnodes(SVgObj *pVgroup) {
|
||||
int32_t bnAllocVnodes(SVgObj *pVgroup) {
|
||||
static int32_t randIndex = 0;
|
||||
int32_t dnode = 0;
|
||||
int32_t vnodes = 0;
|
||||
|
||||
balanceLock();
|
||||
|
||||
balanceAccquireDnodeList();
|
||||
bnLock();
|
||||
bnAccquireDnodes();
|
||||
|
||||
mDebug("db:%s, try alloc %d vnodes to vgroup, dnodes total:%d, avail:%d", pVgroup->dbName, pVgroup->numOfVnodes,
|
||||
mnodeGetDnodesNum(), tsBalanceDnodeListSize);
|
||||
mnodeGetDnodesNum(), tsBnDnodes.size);
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
for (; dnode < tsBalanceDnodeListSize; ++dnode) {
|
||||
SDnodeObj *pDnode = tsBalanceDnodeList[dnode];
|
||||
if (balanceCheckFree(pDnode)) {
|
||||
for (; dnode < tsBnDnodes.size; ++dnode) {
|
||||
SDnodeObj *pDnode = tsBnDnodes.list[dnode];
|
||||
if (bnCheckFree(pDnode)) {
|
||||
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i;
|
||||
pVnodeGid->dnodeId = pDnode->dnodeId;
|
||||
pVnodeGid->pDnode = pDnode;
|
||||
|
@ -148,8 +124,8 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
|
|||
}
|
||||
|
||||
if (vnodes != pVgroup->numOfVnodes) {
|
||||
balanceReleaseDnodeList();
|
||||
balanceUnLock();
|
||||
bnReleaseDnodes();
|
||||
bnUnLock();
|
||||
|
||||
mDebug("db:%s, need vnodes:%d, but alloc:%d", pVgroup->dbName, pVgroup->numOfVnodes, vnodes);
|
||||
|
||||
|
@ -179,33 +155,33 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
|
|||
if (pVgroup->numOfVnodes == 1) {
|
||||
} else if (pVgroup->numOfVnodes == 2) {
|
||||
if (randIndex++ % 2 == 0) {
|
||||
balanceSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1);
|
||||
bnSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1);
|
||||
}
|
||||
} else {
|
||||
int32_t randVal = randIndex++ % 6;
|
||||
if (randVal == 1) { // 1, 0, 2
|
||||
balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
|
||||
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
|
||||
} else if (randVal == 2) { // 1, 2, 0
|
||||
balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
|
||||
balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
|
||||
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
|
||||
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
|
||||
} else if (randVal == 3) { // 2, 1, 0
|
||||
balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
|
||||
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
|
||||
} else if (randVal == 4) { // 2, 0, 1
|
||||
balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
|
||||
balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
|
||||
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
|
||||
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
|
||||
}
|
||||
if (randVal == 5) { // 0, 2, 1
|
||||
balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
|
||||
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
|
||||
} else {
|
||||
} // 0, 1, 2
|
||||
}
|
||||
|
||||
balanceReleaseDnodeList();
|
||||
balanceUnLock();
|
||||
bnReleaseDnodes();
|
||||
bnUnLock();
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
|
||||
static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
|
||||
if (pVgroup->lbTime + 5 * tsStatusInterval > tsAccessSquence) {
|
||||
return false;
|
||||
}
|
||||
|
@ -232,7 +208,7 @@ static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
|
|||
* desc: remove one vnode from vgroup
|
||||
* all vnodes in vgroup should in ready state, except the balancing one
|
||||
**/
|
||||
static int32_t balanceRemoveVnode(SVgObj *pVgroup) {
|
||||
static int32_t bnRemoveVnode(SVgObj *pVgroup) {
|
||||
if (pVgroup->numOfVnodes <= 1) return -1;
|
||||
|
||||
SVnodeGid *pRmVnode = NULL;
|
||||
|
@ -274,17 +250,17 @@ static int32_t balanceRemoveVnode(SVgObj *pVgroup) {
|
|||
pSelVnode = pRmVnode;
|
||||
}
|
||||
|
||||
if (!balanceCheckVgroupReady(pVgroup, pSelVnode)) {
|
||||
if (!bnCheckVgroupReady(pVgroup, pSelVnode)) {
|
||||
mDebug("vgId:%d, is not ready", pVgroup->vgId);
|
||||
return -1;
|
||||
} else {
|
||||
mDebug("vgId:%d, is ready, discard dnode:%d", pVgroup->vgId, pSelVnode->dnodeId);
|
||||
balanceDiscardVnode(pVgroup, pSelVnode);
|
||||
bnDiscardVnode(pVgroup, pSelVnode);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
static bool balanceCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) {
|
||||
static bool bnCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) {
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
SVnodeGid *pGid = &pVgroup->vnodeGid[i];
|
||||
if (pGid->dnodeId == 0) break;
|
||||
|
@ -299,13 +275,13 @@ static bool balanceCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) {
|
|||
/**
|
||||
* desc: add vnode to vgroup, find a new one if dest dnode is null
|
||||
**/
|
||||
static int32_t balanceAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
|
||||
static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
|
||||
if (pDestDnode == NULL) {
|
||||
for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) {
|
||||
SDnodeObj *pDnode = tsBalanceDnodeList[i];
|
||||
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
|
||||
SDnodeObj *pDnode = tsBnDnodes.list[i];
|
||||
if (pDnode == pSrcDnode) continue;
|
||||
if (balanceCheckDnodeInVgroup(pDnode, pVgroup)) continue;
|
||||
if (!balanceCheckFree(pDnode)) continue;
|
||||
if (bnCheckDnodeInVgroup(pDnode, pVgroup)) continue;
|
||||
if (!bnCheckFree(pDnode)) continue;
|
||||
|
||||
pDestDnode = pDnode;
|
||||
mDebug("vgId:%d, add vnode to dnode:%d", pVgroup->vgId, pDnode->dnodeId);
|
||||
|
@ -333,25 +309,25 @@ static int32_t balanceAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool balanceMonitorBalance() {
|
||||
if (tsBalanceDnodeListSize < 2) return false;
|
||||
static bool bnMonitorBalance() {
|
||||
if (tsBnDnodes.size < 2) return false;
|
||||
|
||||
for (int32_t src = tsBalanceDnodeListSize - 1; src >= 0; --src) {
|
||||
SDnodeObj *pDnode = tsBalanceDnodeList[src];
|
||||
mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBalanceDnodeListSize - src - 1,
|
||||
for (int32_t src = tsBnDnodes.size - 1; src >= 0; --src) {
|
||||
SDnodeObj *pDnode = tsBnDnodes.list[src];
|
||||
mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBnDnodes.size - src - 1,
|
||||
pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status), pDnode->score, pDnode->numOfCores,
|
||||
pDnode->openVnodes);
|
||||
}
|
||||
|
||||
float scoresDiff = tsBalanceDnodeList[tsBalanceDnodeListSize - 1]->score - tsBalanceDnodeList[0]->score;
|
||||
float scoresDiff = tsBnDnodes.list[tsBnDnodes.size - 1]->score - tsBnDnodes.list[0]->score;
|
||||
if (scoresDiff < 0.01) {
|
||||
mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBalanceDnodeListSize, scoresDiff);
|
||||
mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBnDnodes.size, scoresDiff);
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int32_t src = tsBalanceDnodeListSize - 1; src > 0; --src) {
|
||||
SDnodeObj *pSrcDnode = tsBalanceDnodeList[src];
|
||||
float srcScore = balanceTryCalcDnodeScore(pSrcDnode, -1);
|
||||
for (int32_t src = tsBnDnodes.size - 1; src > 0; --src) {
|
||||
SDnodeObj *pSrcDnode = tsBnDnodes.list[src];
|
||||
float srcScore = bnTryCalcDnodeScore(pSrcDnode, -1);
|
||||
if (tsEnableBalance == 0 && pSrcDnode->status != TAOS_DN_STATUS_DROPPING) {
|
||||
continue;
|
||||
}
|
||||
|
@ -362,19 +338,19 @@ static bool balanceMonitorBalance() {
|
|||
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
|
||||
if (pVgroup == NULL) break;
|
||||
|
||||
if (balanceCheckDnodeInVgroup(pSrcDnode, pVgroup)) {
|
||||
if (bnCheckDnodeInVgroup(pSrcDnode, pVgroup)) {
|
||||
for (int32_t dest = 0; dest < src; dest++) {
|
||||
SDnodeObj *pDestDnode = tsBalanceDnodeList[dest];
|
||||
if (balanceCheckDnodeInVgroup(pDestDnode, pVgroup)) continue;
|
||||
SDnodeObj *pDestDnode = tsBnDnodes.list[dest];
|
||||
if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) continue;
|
||||
|
||||
float destScore = balanceTryCalcDnodeScore(pDestDnode, 1);
|
||||
float destScore = bnTryCalcDnodeScore(pDestDnode, 1);
|
||||
if (srcScore + 0.0001 < destScore) continue;
|
||||
if (!balanceCheckFree(pDestDnode)) continue;
|
||||
if (!bnCheckFree(pDestDnode)) continue;
|
||||
|
||||
mDebug("vgId:%d, balance from dnode:%d to dnode:%d, srcScore:%.1f:%.1f, destScore:%.1f:%.1f",
|
||||
pVgroup->vgId, pSrcDnode->dnodeId, pDestDnode->dnodeId, pSrcDnode->score,
|
||||
srcScore, pDestDnode->score, destScore);
|
||||
balanceAddVnode(pVgroup, pSrcDnode, pDestDnode);
|
||||
bnAddVnode(pVgroup, pSrcDnode, pDestDnode);
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
mnodeCancelGetNextVgroup(pIter);
|
||||
return true;
|
||||
|
@ -392,7 +368,7 @@ static bool balanceMonitorBalance() {
|
|||
// 1. reset balanceAccessSquence to zero
|
||||
// 2. reset state of dnodes to offline
|
||||
// 3. reset lastAccess of dnodes to zero
|
||||
void balanceReset() {
|
||||
void bnReset() {
|
||||
void * pIter = NULL;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
while (1) {
|
||||
|
@ -413,7 +389,7 @@ void balanceReset() {
|
|||
tsAccessSquence = 0;
|
||||
}
|
||||
|
||||
static int32_t balanceMonitorVgroups() {
|
||||
static int32_t bnMonitorVgroups() {
|
||||
void * pIter = NULL;
|
||||
SVgObj *pVgroup = NULL;
|
||||
bool hasUpdatingVgroup = false;
|
||||
|
@ -429,11 +405,11 @@ static int32_t balanceMonitorVgroups() {
|
|||
if (vgReplica > dbReplica) {
|
||||
mInfo("vgId:%d, replica:%d numOfVnodes:%d, try remove one vnode", pVgroup->vgId, dbReplica, vgReplica);
|
||||
hasUpdatingVgroup = true;
|
||||
code = balanceRemoveVnode(pVgroup);
|
||||
code = bnRemoveVnode(pVgroup);
|
||||
} else if (vgReplica < dbReplica) {
|
||||
mInfo("vgId:%d, replica:%d numOfVnodes:%d, try add one vnode", pVgroup->vgId, dbReplica, vgReplica);
|
||||
hasUpdatingVgroup = true;
|
||||
code = balanceAddVnode(pVgroup, NULL, NULL);
|
||||
code = bnAddVnode(pVgroup, NULL, NULL);
|
||||
}
|
||||
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
|
@ -446,7 +422,7 @@ static int32_t balanceMonitorVgroups() {
|
|||
return hasUpdatingVgroup;
|
||||
}
|
||||
|
||||
static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) {
|
||||
static bool bnMonitorDnodeDropping(SDnodeObj *pDnode) {
|
||||
mDebug("dnode:%d, in dropping state", pDnode->dnodeId);
|
||||
|
||||
void * pIter = NULL;
|
||||
|
@ -456,7 +432,7 @@ static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) {
|
|||
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
|
||||
if (pVgroup == NULL) break;
|
||||
|
||||
hasThisDnode = balanceCheckDnodeInVgroup(pDnode, pVgroup);
|
||||
hasThisDnode = bnCheckDnodeInVgroup(pDnode, pVgroup);
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
|
||||
if (hasThisDnode) {
|
||||
|
@ -474,7 +450,7 @@ static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) {
|
|||
return false;
|
||||
}
|
||||
|
||||
static bool balanceMontiorDropping() {
|
||||
static bool bnMontiorDropping() {
|
||||
void *pIter = NULL;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
|
||||
|
@ -499,7 +475,7 @@ static bool balanceMontiorDropping() {
|
|||
}
|
||||
|
||||
if (pDnode->status == TAOS_DN_STATUS_DROPPING) {
|
||||
bool ret = balanceMonitorDnodeDropping(pDnode);
|
||||
bool ret = bnMonitorDnodeDropping(pDnode);
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
mnodeCancelGetNextDnode(pIter);
|
||||
return ret;
|
||||
|
@ -509,33 +485,31 @@ static bool balanceMontiorDropping() {
|
|||
return false;
|
||||
}
|
||||
|
||||
static bool balanceStart() {
|
||||
bool bnStart() {
|
||||
if (!sdbIsMaster()) return false;
|
||||
|
||||
balanceLock();
|
||||
bnLock();
|
||||
bnAccquireDnodes();
|
||||
|
||||
balanceAccquireDnodeList();
|
||||
bnMonitorDnodeModule();
|
||||
|
||||
balanceMonitorDnodeModule();
|
||||
|
||||
bool updateSoon = balanceMontiorDropping();
|
||||
bool updateSoon = bnMontiorDropping();
|
||||
|
||||
if (!updateSoon) {
|
||||
updateSoon = balanceMonitorVgroups();
|
||||
updateSoon = bnMonitorVgroups();
|
||||
}
|
||||
|
||||
if (!updateSoon) {
|
||||
updateSoon = balanceMonitorBalance();
|
||||
updateSoon = bnMonitorBalance();
|
||||
}
|
||||
|
||||
balanceReleaseDnodeList();
|
||||
|
||||
balanceUnLock();
|
||||
bnReleaseDnodes();
|
||||
bnUnLock();
|
||||
|
||||
return updateSoon;
|
||||
}
|
||||
|
||||
static void balanceSetVgroupOffline(SDnodeObj* pDnode) {
|
||||
static void bnSetVgroupOffline(SDnodeObj* pDnode) {
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SVgObj *pVgroup;
|
||||
|
@ -551,7 +525,7 @@ static void balanceSetVgroupOffline(SDnodeObj* pDnode) {
|
|||
}
|
||||
}
|
||||
|
||||
static void balanceCheckDnodeAccess() {
|
||||
void bnCheckStatus() {
|
||||
void * pIter = NULL;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
|
||||
|
@ -564,84 +538,39 @@ static void balanceCheckDnodeAccess() {
|
|||
pDnode->offlineReason = TAOS_DN_OFF_STATUS_MSG_TIMEOUT;
|
||||
mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence,
|
||||
pDnode->lastAccess, pDnode->status);
|
||||
balanceSetVgroupOffline(pDnode);
|
||||
bnSetVgroupOffline(pDnode);
|
||||
}
|
||||
}
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
}
|
||||
}
|
||||
|
||||
static void balanceProcessBalanceTimer(void *handle, void *tmrId) {
|
||||
if (!sdbIsMaster()) return;
|
||||
|
||||
tsBalanceTimer = NULL;
|
||||
tsAccessSquence ++;
|
||||
|
||||
balanceCheckDnodeAccess();
|
||||
bool updateSoon = false;
|
||||
|
||||
if (handle == NULL) {
|
||||
if (tsAccessSquence % tsBalanceInterval == 0) {
|
||||
mDebug("balance function is scheduled by timer");
|
||||
updateSoon = balanceStart();
|
||||
}
|
||||
} else {
|
||||
int64_t mseconds = (int64_t)handle;
|
||||
mDebug("balance function is scheduled by event for %" PRId64 " mseconds arrived", mseconds);
|
||||
updateSoon = balanceStart();
|
||||
}
|
||||
|
||||
if (updateSoon) {
|
||||
balanceStartTimer(1000);
|
||||
} else {
|
||||
taosTmrReset(balanceProcessBalanceTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBalanceTimer);
|
||||
}
|
||||
}
|
||||
|
||||
static void balanceStartTimer(int64_t mseconds) {
|
||||
taosTmrReset(balanceProcessBalanceTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBalanceTimer);
|
||||
}
|
||||
|
||||
void balanceSyncNotify() {
|
||||
void bnCheckModules() {
|
||||
if (sdbIsMaster()) {
|
||||
balanceLock();
|
||||
balanceAccquireDnodeList();
|
||||
balanceMonitorDnodeModule();
|
||||
balanceReleaseDnodeList();
|
||||
balanceUnLock();
|
||||
bnLock();
|
||||
bnAccquireDnodes();
|
||||
bnMonitorDnodeModule();
|
||||
bnReleaseDnodes();
|
||||
bnUnLock();
|
||||
}
|
||||
}
|
||||
|
||||
void balanceAsyncNotify() {
|
||||
balanceStartTimer(500);
|
||||
}
|
||||
|
||||
int32_t balanceInit() {
|
||||
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_SCORES, balanceGetScoresMeta);
|
||||
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_SCORES, balanceRetrieveScores);
|
||||
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_SCORES, mnodeCancelGetNextDnode);
|
||||
|
||||
pthread_mutex_init(&tsBalanceMutex, NULL);
|
||||
balanceInitDnodeList();
|
||||
balanceStartTimer(2000);
|
||||
mDebug("balance start fp:%p initialized", balanceProcessBalanceTimer);
|
||||
|
||||
balanceReset();
|
||||
int32_t bnInit() {
|
||||
pthread_mutex_init(&tsBnMgmt.mutex, NULL);
|
||||
bnInitDnodes();
|
||||
bnInitThread();
|
||||
bnReset();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void balanceCleanUp() {
|
||||
if (tsBalanceTimer != NULL) {
|
||||
taosTmrStopA(&tsBalanceTimer);
|
||||
pthread_mutex_destroy(&tsBalanceMutex);
|
||||
tsBalanceTimer = NULL;
|
||||
mDebug("stop balance timer");
|
||||
}
|
||||
balanceCleanupDnodeList();
|
||||
void bnCleanUp() {
|
||||
bnCleanupThread();
|
||||
bnCleanupDnodes();
|
||||
pthread_mutex_destroy(&tsBnMgmt.mutex);
|
||||
}
|
||||
|
||||
int32_t balanceDropDnode(SDnodeObj *pDnode) {
|
||||
int32_t bnDropDnode(SDnodeObj *pDnode) {
|
||||
int32_t totalFreeVnodes = 0;
|
||||
void * pIter = NULL;
|
||||
SDnodeObj *pTempDnode = NULL;
|
||||
|
@ -650,7 +579,7 @@ int32_t balanceDropDnode(SDnodeObj *pDnode) {
|
|||
pIter = mnodeGetNextDnode(pIter, &pTempDnode);
|
||||
if (pTempDnode == NULL) break;
|
||||
|
||||
if (pTempDnode != pDnode && balanceCheckFree(pTempDnode)) {
|
||||
if (pTempDnode != pDnode && bnCheckFree(pTempDnode)) {
|
||||
totalFreeVnodes += (TSDB_MAX_VNODES - pTempDnode->openVnodes);
|
||||
}
|
||||
|
||||
|
@ -665,298 +594,17 @@ int32_t balanceDropDnode(SDnodeObj *pDnode) {
|
|||
pDnode->status = TAOS_DN_STATUS_DROPPING;
|
||||
mnodeUpdateDnode(pDnode);
|
||||
|
||||
balanceStartTimer(1100);
|
||||
bnStartTimer(1100);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t balanceCalcCpuScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->cpuAvgUsage < 80)
|
||||
return 0;
|
||||
else if (pDnode->cpuAvgUsage < 90)
|
||||
return 10;
|
||||
else
|
||||
return 50;
|
||||
}
|
||||
|
||||
static int32_t balanceCalcMemoryScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->memoryAvgUsage < 80)
|
||||
return 0;
|
||||
else if (pDnode->memoryAvgUsage < 90)
|
||||
return 10;
|
||||
else
|
||||
return 50;
|
||||
}
|
||||
|
||||
static int32_t balanceCalcDiskScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->diskAvgUsage < 80)
|
||||
return 0;
|
||||
else if (pDnode->diskAvgUsage < 90)
|
||||
return 10;
|
||||
else
|
||||
return 50;
|
||||
}
|
||||
|
||||
static int32_t balanceCalcBandwidthScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->bandwidthUsage < 30)
|
||||
return 0;
|
||||
else if (pDnode->bandwidthUsage < 80)
|
||||
return 10;
|
||||
else
|
||||
return 50;
|
||||
}
|
||||
|
||||
static float balanceCalcModuleScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->numOfCores <= 0) return 0;
|
||||
if (pDnode->isMgmt) {
|
||||
return (float)tsMnodeEqualVnodeNum / pDnode->numOfCores;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static float balanceCalcVnodeScore(SDnodeObj *pDnode, int32_t extra) {
|
||||
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) return 100000000;
|
||||
if (pDnode->numOfCores <= 0) return 0;
|
||||
return (float)(pDnode->openVnodes + extra) / pDnode->numOfCores;
|
||||
}
|
||||
|
||||
/**
|
||||
* calc singe score, such as cpu/memory/disk/bandwitdh/vnode
|
||||
* 1. get the score config
|
||||
* 2. if the value is out of range, use border data
|
||||
* 3. otherwise use interpolation method
|
||||
**/
|
||||
void balanceCalcDnodeScore(SDnodeObj *pDnode) {
|
||||
pDnode->score = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) +
|
||||
balanceCalcBandwidthScore(pDnode) + balanceCalcModuleScore(pDnode) +
|
||||
balanceCalcVnodeScore(pDnode, 0) + pDnode->customScore;
|
||||
}
|
||||
|
||||
float balanceTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extra) {
|
||||
int32_t systemScore = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) +
|
||||
balanceCalcBandwidthScore(pDnode);
|
||||
float moduleScore = balanceCalcModuleScore(pDnode);
|
||||
float vnodeScore = balanceCalcVnodeScore(pDnode, extra);
|
||||
|
||||
float score = systemScore + moduleScore + vnodeScore + pDnode->customScore;
|
||||
return score;
|
||||
}
|
||||
|
||||
static void balanceInitDnodeList() {
|
||||
tsBalanceDnodeList = calloc(tsBalanceDnodeListMallocSize, sizeof(SDnodeObj *));
|
||||
}
|
||||
|
||||
static void balanceCleanupDnodeList() {
|
||||
if (tsBalanceDnodeList != NULL) {
|
||||
free(tsBalanceDnodeList);
|
||||
tsBalanceDnodeList = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void balanceCheckDnodeListSize(int32_t dnodesNum) {
|
||||
if (tsBalanceDnodeListMallocSize <= dnodesNum) {
|
||||
tsBalanceDnodeListMallocSize = dnodesNum * 2;
|
||||
tsBalanceDnodeList = realloc(tsBalanceDnodeList, tsBalanceDnodeListMallocSize * sizeof(SDnodeObj *));
|
||||
}
|
||||
}
|
||||
|
||||
void balanceAccquireDnodeList() {
|
||||
int32_t dnodesNum = mnodeGetDnodesNum();
|
||||
balanceCheckDnodeListSize(dnodesNum);
|
||||
|
||||
void * pIter = NULL;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
int32_t dnodeIndex = 0;
|
||||
|
||||
while (1) {
|
||||
if (dnodeIndex >= dnodesNum) {
|
||||
mnodeCancelGetNextDnode(pIter);
|
||||
break;
|
||||
}
|
||||
|
||||
pIter = mnodeGetNextDnode(pIter, &pDnode);
|
||||
if (pDnode == NULL) break;
|
||||
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
continue;
|
||||
}
|
||||
|
||||
balanceCalcDnodeScore(pDnode);
|
||||
|
||||
int32_t orderIndex = dnodeIndex;
|
||||
for (; orderIndex > 0; --orderIndex) {
|
||||
if (pDnode->score > tsBalanceDnodeList[orderIndex - 1]->score) {
|
||||
break;
|
||||
}
|
||||
tsBalanceDnodeList[orderIndex] = tsBalanceDnodeList[orderIndex - 1];
|
||||
}
|
||||
tsBalanceDnodeList[orderIndex] = pDnode;
|
||||
dnodeIndex++;
|
||||
}
|
||||
|
||||
tsBalanceDnodeListSize = dnodeIndex;
|
||||
}
|
||||
|
||||
void balanceReleaseDnodeList() {
|
||||
for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) {
|
||||
SDnodeObj *pDnode = tsBalanceDnodeList[i];
|
||||
if (pDnode != NULL) {
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t balanceGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||
SUserObj *pUser = mnodeGetUserFromConn(pConn);
|
||||
if (pUser == NULL) return 0;
|
||||
|
||||
if (strcmp(pUser->pAcct->user, "root") != 0) {
|
||||
mnodeDecUserRef(pUser);
|
||||
return TSDB_CODE_MND_NO_RIGHTS;
|
||||
}
|
||||
|
||||
int32_t cols = 0;
|
||||
SSchema *pSchema = pMeta->schema;
|
||||
|
||||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "id");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "system scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "custom scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "module scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "vnode scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "total scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "open vnodes");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "cpu cores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 18 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "balance state");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htons(cols);
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
for (int32_t i = 1; i < cols; ++i) {
|
||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
}
|
||||
|
||||
pShow->numOfRows = mnodeGetDnodesNum();
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
pShow->pIter = NULL;
|
||||
|
||||
mnodeDecUserRef(pUser);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
|
||||
int32_t numOfRows = 0;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
char * pWrite;
|
||||
int32_t cols = 0;
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pShow->pIter = mnodeGetNextDnode(pShow->pIter, &pDnode);
|
||||
if (pDnode == NULL) break;
|
||||
|
||||
int32_t systemScore = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) +
|
||||
balanceCalcBandwidthScore(pDnode);
|
||||
float moduleScore = balanceCalcModuleScore(pDnode);
|
||||
float vnodeScore = balanceCalcVnodeScore(pDnode, 0);
|
||||
|
||||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = pDnode->dnodeId;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = systemScore;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = pDnode->customScore;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = (int32_t)moduleScore;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = (int32_t)vnodeScore;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = (int32_t)(vnodeScore + moduleScore + pDnode->customScore + systemScore);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDnode->openVnodes;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDnode->numOfCores;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status));
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
}
|
||||
|
||||
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static void balanceMonitorDnodeModule() {
|
||||
static void bnMonitorDnodeModule() {
|
||||
int32_t numOfMnodes = mnodeGetMnodesNum();
|
||||
if (numOfMnodes >= tsNumOfMnodes) return;
|
||||
|
||||
for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) {
|
||||
SDnodeObj *pDnode = tsBalanceDnodeList[i];
|
||||
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
|
||||
SDnodeObj *pDnode = tsBnDnodes.list[i];
|
||||
if (pDnode == NULL) break;
|
||||
|
||||
if (pDnode->isMgmt || pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) {
|
||||
|
@ -980,7 +628,7 @@ static void balanceMonitorDnodeModule() {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t balanceAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t dnodeId) {
|
||||
int32_t bnAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t dnodeId) {
|
||||
if (!sdbIsMaster()) {
|
||||
mError("dnode:%d, failed to alter vgId:%d to dnode:%d, for self not master", pSrcDnode->dnodeId, vnodeId, dnodeId);
|
||||
return TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
|
@ -1004,29 +652,29 @@ int32_t balanceAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t
|
|||
return TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
}
|
||||
|
||||
balanceLock();
|
||||
balanceAccquireDnodeList();
|
||||
bnLock();
|
||||
bnAccquireDnodes();
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (!balanceCheckDnodeInVgroup(pSrcDnode, pVgroup)) {
|
||||
if (!bnCheckDnodeInVgroup(pSrcDnode, pVgroup)) {
|
||||
mError("dnode:%d, failed to alter vgId:%d to dnode:%d, vgroup not in dnode:%d", pSrcDnode->dnodeId, vnodeId,
|
||||
dnodeId, pSrcDnode->dnodeId);
|
||||
code = TSDB_CODE_MND_VGROUP_NOT_IN_DNODE;
|
||||
} else if (balanceCheckDnodeInVgroup(pDestDnode, pVgroup)) {
|
||||
} else if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) {
|
||||
mError("dnode:%d, failed to alter vgId:%d to dnode:%d, vgroup already in dnode:%d", pSrcDnode->dnodeId, vnodeId,
|
||||
dnodeId, dnodeId);
|
||||
code = TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE;
|
||||
} else if (!balanceCheckFree(pDestDnode)) {
|
||||
} else if (!bnCheckFree(pDestDnode)) {
|
||||
mError("dnode:%d, failed to alter vgId:%d to dnode:%d, for dnode:%d not free", pSrcDnode->dnodeId, vnodeId, dnodeId,
|
||||
dnodeId);
|
||||
code = TSDB_CODE_MND_DNODE_NOT_FREE;
|
||||
} else {
|
||||
code = balanceAddVnode(pVgroup, pSrcDnode, pDestDnode);
|
||||
code = bnAddVnode(pVgroup, pSrcDnode, pDestDnode);
|
||||
mInfo("dnode:%d, alter vgId:%d to dnode:%d, result:%s", pSrcDnode->dnodeId, vnodeId, dnodeId, tstrerror(code));
|
||||
}
|
||||
|
||||
balanceReleaseDnodeList();
|
||||
balanceUnLock();
|
||||
bnReleaseDnodes();
|
||||
bnUnLock();
|
||||
|
||||
mnodeDecVgroupRef(pVgroup);
|
||||
mnodeDecDnodeRef(pDestDnode);
|
|
@ -0,0 +1,312 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tglobal.h"
|
||||
#include "mnodeShow.h"
|
||||
#include "mnodeUser.h"
|
||||
#include "bnScore.h"
|
||||
|
||||
SBnDnodes tsBnDnodes;
|
||||
|
||||
static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||
static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
static int32_t bnCalcCpuScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->cpuAvgUsage < 80)
|
||||
return 0;
|
||||
else if (pDnode->cpuAvgUsage < 90)
|
||||
return 10;
|
||||
else
|
||||
return 50;
|
||||
}
|
||||
|
||||
static int32_t bnCalcMemoryScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->memoryAvgUsage < 80)
|
||||
return 0;
|
||||
else if (pDnode->memoryAvgUsage < 90)
|
||||
return 10;
|
||||
else
|
||||
return 50;
|
||||
}
|
||||
|
||||
static int32_t bnCalcDiskScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->diskAvgUsage < 80)
|
||||
return 0;
|
||||
else if (pDnode->diskAvgUsage < 90)
|
||||
return 10;
|
||||
else
|
||||
return 50;
|
||||
}
|
||||
|
||||
static int32_t bnCalcBandScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->bandwidthUsage < 30)
|
||||
return 0;
|
||||
else if (pDnode->bandwidthUsage < 80)
|
||||
return 10;
|
||||
else
|
||||
return 50;
|
||||
}
|
||||
|
||||
static float bnCalcModuleScore(SDnodeObj *pDnode) {
|
||||
if (pDnode->numOfCores <= 0) return 0;
|
||||
if (pDnode->isMgmt) {
|
||||
return (float)tsMnodeEqualVnodeNum / pDnode->numOfCores;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static float bnCalcVnodeScore(SDnodeObj *pDnode, int32_t extra) {
|
||||
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) return 100000000;
|
||||
if (pDnode->numOfCores <= 0) return 0;
|
||||
return (float)(pDnode->openVnodes + extra) / pDnode->numOfCores;
|
||||
}
|
||||
|
||||
/**
|
||||
* calc singe score, such as cpu/memory/disk/bandwitdh/vnode
|
||||
* 1. get the score config
|
||||
* 2. if the value is out of range, use border data
|
||||
* 3. otherwise use interpolation method
|
||||
**/
|
||||
static void bnCalcDnodeScore(SDnodeObj *pDnode) {
|
||||
pDnode->score = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) +
|
||||
bnCalcBandScore(pDnode) + bnCalcModuleScore(pDnode) + bnCalcVnodeScore(pDnode, 0) +
|
||||
pDnode->customScore;
|
||||
}
|
||||
|
||||
float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extra) {
|
||||
int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) +
|
||||
bnCalcBandScore(pDnode);
|
||||
float moduleScore = bnCalcModuleScore(pDnode);
|
||||
float vnodeScore = bnCalcVnodeScore(pDnode, extra);
|
||||
|
||||
float score = systemScore + moduleScore + vnodeScore + pDnode->customScore;
|
||||
return score;
|
||||
}
|
||||
|
||||
void bnInitDnodes() {
|
||||
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_SCORES, bnGetScoresMeta);
|
||||
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_SCORES, bnRetrieveScores);
|
||||
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_SCORES, mnodeCancelGetNextDnode);
|
||||
|
||||
memset(&tsBnDnodes, 0, sizeof(SBnDnodes));
|
||||
tsBnDnodes.maxSize = 16;
|
||||
tsBnDnodes.list = calloc(tsBnDnodes.maxSize, sizeof(SDnodeObj *));
|
||||
}
|
||||
|
||||
void bnCleanupDnodes() {
|
||||
if (tsBnDnodes.list != NULL) {
|
||||
free(tsBnDnodes.list);
|
||||
tsBnDnodes.list = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void bnCheckDnodesSize(int32_t dnodesNum) {
|
||||
if (tsBnDnodes.maxSize <= dnodesNum) {
|
||||
tsBnDnodes.maxSize = dnodesNum * 2;
|
||||
tsBnDnodes.list = realloc(tsBnDnodes.list, tsBnDnodes.maxSize * sizeof(SDnodeObj *));
|
||||
}
|
||||
}
|
||||
|
||||
void bnAccquireDnodes() {
|
||||
int32_t dnodesNum = mnodeGetDnodesNum();
|
||||
bnCheckDnodesSize(dnodesNum);
|
||||
|
||||
void * pIter = NULL;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
int32_t dnodeIndex = 0;
|
||||
|
||||
while (1) {
|
||||
if (dnodeIndex >= dnodesNum) {
|
||||
mnodeCancelGetNextDnode(pIter);
|
||||
break;
|
||||
}
|
||||
|
||||
pIter = mnodeGetNextDnode(pIter, &pDnode);
|
||||
if (pDnode == NULL) break;
|
||||
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
continue;
|
||||
}
|
||||
|
||||
bnCalcDnodeScore(pDnode);
|
||||
|
||||
int32_t orderIndex = dnodeIndex;
|
||||
for (; orderIndex > 0; --orderIndex) {
|
||||
if (pDnode->score > tsBnDnodes.list[orderIndex - 1]->score) {
|
||||
break;
|
||||
}
|
||||
tsBnDnodes.list[orderIndex] = tsBnDnodes.list[orderIndex - 1];
|
||||
}
|
||||
tsBnDnodes.list[orderIndex] = pDnode;
|
||||
dnodeIndex++;
|
||||
}
|
||||
|
||||
tsBnDnodes.size = dnodeIndex;
|
||||
}
|
||||
|
||||
void bnReleaseDnodes() {
|
||||
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
|
||||
SDnodeObj *pDnode = tsBnDnodes.list[i];
|
||||
if (pDnode != NULL) {
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||
SUserObj *pUser = mnodeGetUserFromConn(pConn);
|
||||
if (pUser == NULL) return 0;
|
||||
|
||||
if (strcmp(pUser->pAcct->user, "root") != 0) {
|
||||
mnodeDecUserRef(pUser);
|
||||
return TSDB_CODE_MND_NO_RIGHTS;
|
||||
}
|
||||
|
||||
int32_t cols = 0;
|
||||
SSchema *pSchema = pMeta->schema;
|
||||
|
||||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "id");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "system scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "custom scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "module scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "vnode scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_FLOAT;
|
||||
strcpy(pSchema[cols].name, "total scores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "open vnodes");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "cpu cores");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 18 + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "balance state");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htons(cols);
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
for (int32_t i = 1; i < cols; ++i) {
|
||||
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
}
|
||||
|
||||
pShow->numOfRows = mnodeGetDnodesNum();
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
pShow->pIter = NULL;
|
||||
|
||||
mnodeDecUserRef(pUser);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
|
||||
int32_t numOfRows = 0;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
char * pWrite;
|
||||
int32_t cols = 0;
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pShow->pIter = mnodeGetNextDnode(pShow->pIter, &pDnode);
|
||||
if (pDnode == NULL) break;
|
||||
|
||||
int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) + bnCalcBandScore(pDnode);
|
||||
float moduleScore = bnCalcModuleScore(pDnode);
|
||||
float vnodeScore = bnCalcVnodeScore(pDnode, 0);
|
||||
|
||||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = pDnode->dnodeId;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = systemScore;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = pDnode->customScore;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = (int32_t)moduleScore;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = (int32_t)vnodeScore;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(float *)pWrite = (int32_t)(vnodeScore + moduleScore + pDnode->customScore + systemScore);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDnode->openVnodes;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int32_t *)pWrite = pDnode->numOfCores;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status));
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
}
|
||||
|
||||
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
|
@ -0,0 +1,129 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "ttimer.h"
|
||||
#include "tglobal.h"
|
||||
#include "mnodeSdb.h"
|
||||
#include "bnThread.h"
|
||||
|
||||
static SBnThread tsBnThread;
|
||||
|
||||
static void *bnThreadFunc(void *arg) {
|
||||
while (1) {
|
||||
pthread_mutex_lock(&tsBnThread.mutex);
|
||||
if (tsBnThread.stop) {
|
||||
pthread_mutex_unlock(&tsBnThread.mutex);
|
||||
break;
|
||||
}
|
||||
|
||||
pthread_cond_wait(&tsBnThread.cond, &tsBnThread.mutex);
|
||||
bool updateSoon = bnStart();
|
||||
bnStartTimer(updateSoon ? 1000 : -1);
|
||||
pthread_mutex_unlock(&(tsBnThread.mutex));
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t bnInitThread() {
|
||||
memset(&tsBnThread, 0, sizeof(SBnThread));
|
||||
tsBnThread.stop = false;
|
||||
pthread_mutex_init(&tsBnThread.mutex, NULL);
|
||||
pthread_cond_init(&tsBnThread.cond, NULL);
|
||||
|
||||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
|
||||
int32_t ret = pthread_create(&tsBnThread.thread, &thattr, bnThreadFunc, NULL);
|
||||
pthread_attr_destroy(&thattr);
|
||||
|
||||
if (ret != 0) {
|
||||
mError("failed to create balance thread since %s", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
bnStartTimer(2000);
|
||||
mDebug("balance thread is created");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void bnCleanupThread() {
|
||||
mDebug("balance thread will be cleanup");
|
||||
|
||||
if (tsBnThread.timer != NULL) {
|
||||
taosTmrStopA(&tsBnThread.timer);
|
||||
tsBnThread.timer = NULL;
|
||||
mDebug("stop balance timer");
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&tsBnThread.mutex);
|
||||
tsBnThread.stop = true;
|
||||
pthread_cond_signal(&tsBnThread.cond);
|
||||
pthread_mutex_unlock(&(tsBnThread.mutex));
|
||||
pthread_join(tsBnThread.thread, NULL);
|
||||
|
||||
pthread_cond_destroy(&tsBnThread.cond);
|
||||
pthread_mutex_destroy(&tsBnThread.mutex);
|
||||
}
|
||||
|
||||
static void bnPostSignal() {
|
||||
pthread_mutex_lock(&tsBnThread.mutex);
|
||||
pthread_cond_signal(&tsBnThread.cond);
|
||||
pthread_mutex_unlock(&(tsBnThread.mutex));
|
||||
}
|
||||
|
||||
/*
|
||||
* once sdb work as mater, then tsAccessSquence reset to zero
|
||||
* increase tsAccessSquence every balance interval
|
||||
*/
|
||||
|
||||
static void bnProcessTimer(void *handle, void *tmrId) {
|
||||
if (!sdbIsMaster()) return;
|
||||
if (tsBnThread.stop) return;
|
||||
|
||||
tsBnThread.timer = NULL;
|
||||
tsAccessSquence++;
|
||||
|
||||
bnCheckStatus();
|
||||
bnStartTimer(-1);
|
||||
|
||||
if (handle == NULL) {
|
||||
if (tsAccessSquence % tsBalanceInterval == 0) {
|
||||
mDebug("balance function is scheduled by timer");
|
||||
bnPostSignal();
|
||||
}
|
||||
} else {
|
||||
int64_t mseconds = (int64_t)handle;
|
||||
mDebug("balance function is scheduled by event for %" PRId64 " mseconds arrived", mseconds);
|
||||
bnPostSignal();
|
||||
}
|
||||
}
|
||||
|
||||
void bnStartTimer(int64_t mseconds) {
|
||||
if (tsBnThread.stop) return;
|
||||
|
||||
bool updateSoon = (mseconds != -1);
|
||||
if (updateSoon) {
|
||||
taosTmrReset(bnProcessTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBnThread.timer);
|
||||
} else {
|
||||
taosTmrReset(bnProcessTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBnThread.timer);
|
||||
}
|
||||
}
|
||||
|
||||
void bnNotify() {
|
||||
bnStartTimer(500);
|
||||
}
|
|
@ -5103,7 +5103,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
|
|||
const int tokenDebugFlagEnd = 20;
|
||||
const SDNodeDynConfOption cfgOptions[] = {
|
||||
{"resetLog", 8}, {"resetQueryCache", 15}, {"balance", 7}, {"monitor", 7},
|
||||
{"debugFlag", 9}, {"monitorDebugFlag", 16}, {"vDebugFlag", 10}, {"mDebugFlag", 10},
|
||||
{"debugFlag", 9}, {"monDebugFlag", 12}, {"vDebugFlag", 10}, {"mDebugFlag", 10},
|
||||
{"cDebugFlag", 10}, {"httpDebugFlag", 13}, {"qDebugflag", 10}, {"sdbDebugFlag", 12},
|
||||
{"uDebugFlag", 10}, {"tsdbDebugFlag", 13}, {"sDebugflag", 10}, {"rpcDebugFlag", 12},
|
||||
{"dDebugFlag", 10}, {"mqttDebugFlag", 13}, {"wDebugFlag", 10}, {"tmrDebugFlag", 12},
|
||||
|
|
|
@ -179,7 +179,7 @@ extern int32_t tmrDebugFlag;
|
|||
extern int32_t sdbDebugFlag;
|
||||
extern int32_t httpDebugFlag;
|
||||
extern int32_t mqttDebugFlag;
|
||||
extern int32_t monitorDebugFlag;
|
||||
extern int32_t monDebugFlag;
|
||||
extern int32_t uDebugFlag;
|
||||
extern int32_t rpcDebugFlag;
|
||||
extern int32_t odbcDebugFlag;
|
||||
|
|
|
@ -209,7 +209,7 @@ int32_t jniDebugFlag = 131;
|
|||
int32_t odbcDebugFlag = 131;
|
||||
int32_t httpDebugFlag = 131;
|
||||
int32_t mqttDebugFlag = 131;
|
||||
int32_t monitorDebugFlag = 131;
|
||||
int32_t monDebugFlag = 131;
|
||||
int32_t qDebugFlag = 131;
|
||||
int32_t rpcDebugFlag = 131;
|
||||
int32_t uDebugFlag = 131;
|
||||
|
@ -219,9 +219,9 @@ int32_t wDebugFlag = 135;
|
|||
int32_t tsdbDebugFlag = 131;
|
||||
int32_t cqDebugFlag = 135;
|
||||
|
||||
int32_t (*monitorStartSystemFp)() = NULL;
|
||||
void (*monitorStopSystemFp)() = NULL;
|
||||
void (*monitorExecuteSQLFp)(char *sql) = NULL;
|
||||
int32_t (*monStartSystemFp)() = NULL;
|
||||
void (*monStopSystemFp)() = NULL;
|
||||
void (*monExecuteSQLFp)(char *sql) = NULL;
|
||||
|
||||
char *qtypeStr[] = {"rpc", "fwd", "wal", "cq", "query"};
|
||||
|
||||
|
@ -238,7 +238,7 @@ void taosSetAllDebugFlag() {
|
|||
odbcDebugFlag = debugFlag;
|
||||
httpDebugFlag = debugFlag;
|
||||
mqttDebugFlag = debugFlag;
|
||||
monitorDebugFlag = debugFlag;
|
||||
monDebugFlag = debugFlag;
|
||||
qDebugFlag = debugFlag;
|
||||
rpcDebugFlag = debugFlag;
|
||||
uDebugFlag = debugFlag;
|
||||
|
@ -279,15 +279,15 @@ bool taosCfgDynamicOptions(char *msg) {
|
|||
|
||||
if (strncasecmp(cfg->option, "monitor", olen) == 0) {
|
||||
if (1 == vint) {
|
||||
if (monitorStartSystemFp) {
|
||||
(*monitorStartSystemFp)();
|
||||
if (monStartSystemFp) {
|
||||
(*monStartSystemFp)();
|
||||
uInfo("monitor is enabled");
|
||||
} else {
|
||||
uError("monitor can't be updated, for monitor not initialized");
|
||||
}
|
||||
} else {
|
||||
if (monitorStopSystemFp) {
|
||||
(*monitorStopSystemFp)();
|
||||
if (monStopSystemFp) {
|
||||
(*monStopSystemFp)();
|
||||
uInfo("monitor is disabled");
|
||||
} else {
|
||||
uError("monitor can't be updated, for monitor not initialized");
|
||||
|
@ -310,8 +310,8 @@ bool taosCfgDynamicOptions(char *msg) {
|
|||
}
|
||||
|
||||
if (strncasecmp(option, "resetQueryCache", 15) == 0) {
|
||||
if (monitorExecuteSQLFp) {
|
||||
(*monitorExecuteSQLFp)("resetQueryCache");
|
||||
if (monExecuteSQLFp) {
|
||||
(*monExecuteSQLFp)("resetQueryCache");
|
||||
uInfo("resetquerycache is executed");
|
||||
} else {
|
||||
uError("resetquerycache can't be executed, for monitor not started");
|
||||
|
@ -1240,8 +1240,8 @@ static void doInitGlobalConfig(void) {
|
|||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "monitorDebugFlag";
|
||||
cfg.ptr = &monitorDebugFlag;
|
||||
cfg.option = "monDebugFlag";
|
||||
cfg.ptr = &monDebugFlag;
|
||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG;
|
||||
cfg.minValue = 0;
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
#include "tqueue.h"
|
||||
#include "tsync.h"
|
||||
#include "ttimer.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tglobal.h"
|
||||
#include "dnode.h"
|
||||
#include "vnode.h"
|
||||
|
|
|
@ -78,10 +78,10 @@ static void dnodeAllocModules() {
|
|||
|
||||
tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1);
|
||||
tsModule[TSDB_MOD_MONITOR].name = "monitor";
|
||||
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
|
||||
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
|
||||
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
|
||||
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
|
||||
tsModule[TSDB_MOD_MONITOR].initFp = monInitSystem;
|
||||
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monCleanupSystem;
|
||||
tsModule[TSDB_MOD_MONITOR].startFp = monStartSystem;
|
||||
tsModule[TSDB_MOD_MONITOR].stopFp = monStopSystem;
|
||||
if (tsEnableMonitorModule) {
|
||||
dnodeSetModuleStatus(TSDB_MOD_MONITOR);
|
||||
}
|
||||
|
|
|
@ -47,13 +47,13 @@ typedef struct {
|
|||
int8_t accessState;
|
||||
} SAcctMonitorObj;
|
||||
|
||||
int32_t monitorInitSystem();
|
||||
int32_t monitorStartSystem();
|
||||
void monitorStopSystem();
|
||||
void monitorCleanUpSystem();
|
||||
void monitorSaveAcctLog(SAcctMonitorObj *pMonObj);
|
||||
void monitorSaveLog(int32_t level, const char *const format, ...);
|
||||
void monitorExecuteSQL(char *sql);
|
||||
int32_t monInitSystem();
|
||||
int32_t monStartSystem();
|
||||
void monStopSystem();
|
||||
void monCleanupSystem();
|
||||
void monSaveAcctLog(SAcctMonitorObj *pMonObj);
|
||||
void monSaveLog(int32_t level, const char *const format, ...);
|
||||
void monExecuteSQL(char *sql);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -23,14 +23,14 @@ extern "C" {
|
|||
struct SVgObj;
|
||||
struct SDnodeObj;
|
||||
|
||||
int32_t balanceInit();
|
||||
void balanceCleanUp();
|
||||
void balanceAsyncNotify();
|
||||
void balanceSyncNotify();
|
||||
void balanceReset();
|
||||
int32_t balanceAllocVnodes(struct SVgObj *pVgroup);
|
||||
int32_t balanceAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId);
|
||||
int32_t balanceDropDnode(struct SDnodeObj *pDnode);
|
||||
int32_t bnInit();
|
||||
void bnCleanUp();
|
||||
void bnNotify();
|
||||
void bnCheckModules();
|
||||
void bnReset();
|
||||
int32_t bnAllocVnodes(struct SVgObj *pVgroup);
|
||||
int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId);
|
||||
int32_t bnDropDnode(struct SDnodeObj *pDnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
|
@ -41,9 +41,9 @@ extern int32_t sdbDebugFlag;
|
|||
#define sdbDebug(...) { if (sdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }}
|
||||
#define sdbTrace(...) { if (sdbDebugFlag & DEBUG_TRACE) { taosPrintLog("SDB ", sdbDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
#define mLError(...) { monitorSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) }
|
||||
#define mLWarn(...) { monitorSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
|
||||
#define mLInfo(...) { monitorSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) }
|
||||
#define mLError(...) { monSaveLog(2, __VA_ARGS__); mError(__VA_ARGS__) }
|
||||
#define mLWarn(...) { monSaveLog(1, __VA_ARGS__); mWarn(__VA_ARGS__) }
|
||||
#define mLInfo(...) { monSaveLog(0, __VA_ARGS__); mInfo(__VA_ARGS__) }
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
#include "tgrant.h"
|
||||
#include "tglobal.h"
|
||||
#include "tname.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tdataformat.h"
|
||||
#include "mnode.h"
|
||||
#include "mnodeDef.h"
|
||||
|
@ -1004,7 +1004,7 @@ static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
mDebug("db:%s, all vgroups is altered", pDb->name);
|
||||
mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
|
||||
|
||||
balanceAsyncNotify();
|
||||
bnNotify();
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -16,12 +16,12 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tgrant.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tglobal.h"
|
||||
#include "tconfig.h"
|
||||
#include "tutil.h"
|
||||
#include "tsocket.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tsync.h"
|
||||
#include "tdataformat.h"
|
||||
#include "mnode.h"
|
||||
|
@ -115,7 +115,7 @@ static int32_t mnodeDnodeActionDelete(SSdbRow *pRow) {
|
|||
mnodeDropAllDnodeVgroups(pDnode);
|
||||
#endif
|
||||
mnodeDropMnodeLocal(pDnode->dnodeId);
|
||||
balanceAsyncNotify();
|
||||
bnNotify();
|
||||
mnodeUpdateDnodeEps();
|
||||
|
||||
mDebug("dnode:%d, all vgroups is dropped from sdb", pDnode->dnodeId);
|
||||
|
@ -347,7 +347,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_MND_INVALID_DNODE_CFG_OPTION;
|
||||
}
|
||||
|
||||
int32_t code = balanceAlterDnode(pDnode, vnodeId, dnodeId);
|
||||
int32_t code = bnAlterDnode(pDnode, vnodeId, dnodeId);
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
return code;
|
||||
} else {
|
||||
|
@ -591,8 +591,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
|
|||
mInfo("dnode:%d, from offline to online", pDnode->dnodeId);
|
||||
pDnode->status = TAOS_DN_STATUS_READY;
|
||||
pDnode->offlineReason = TAOS_DN_OFF_ONLINE;
|
||||
balanceSyncNotify();
|
||||
balanceAsyncNotify();
|
||||
bnCheckModules();
|
||||
bnNotify();
|
||||
}
|
||||
|
||||
if (openVnodes != pDnode->openVnodes) {
|
||||
|
@ -708,7 +708,7 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) {
|
|||
#ifndef _SYNC
|
||||
int32_t code = mnodeDropDnode(pDnode, pMsg);
|
||||
#else
|
||||
int32_t code = balanceDropDnode(pDnode);
|
||||
int32_t code = bnDropDnode(pDnode);
|
||||
#endif
|
||||
mnodeDecDnodeRef(pDnode);
|
||||
return code;
|
||||
|
@ -1182,12 +1182,12 @@ static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) {
|
|||
|
||||
#ifndef _SYNC
|
||||
|
||||
int32_t balanceInit() { return TSDB_CODE_SUCCESS; }
|
||||
void balanceCleanUp() {}
|
||||
void balanceAsyncNotify() {}
|
||||
void balanceSyncNotify() {}
|
||||
void balanceReset() {}
|
||||
int32_t balanceAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId) { return TSDB_CODE_SYN_NOT_ENABLED; }
|
||||
int32_t bnInit() { return TSDB_CODE_SUCCESS; }
|
||||
void bnCleanUp() {}
|
||||
void bnNotify() {}
|
||||
void bnCheckModules() {}
|
||||
void bnReset() {}
|
||||
int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId) { return TSDB_CODE_SYN_NOT_ENABLED; }
|
||||
|
||||
char* syncRole[] = {
|
||||
"offline",
|
||||
|
@ -1197,7 +1197,7 @@ char* syncRole[] = {
|
|||
"master"
|
||||
};
|
||||
|
||||
int32_t balanceAllocVnodes(SVgObj *pVgroup) {
|
||||
int32_t bnAllocVnodes(SVgObj *pVgroup) {
|
||||
void * pIter = NULL;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
SDnodeObj *pSelDnode = NULL;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "os.h"
|
||||
#include "taosdef.h"
|
||||
#include "tsched.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tgrant.h"
|
||||
#include "ttimer.h"
|
||||
#include "tglobal.h"
|
||||
|
@ -58,7 +58,7 @@ static const SMnodeComponent tsMnodeComponents[] = {
|
|||
{"tables", mnodeInitTables, mnodeCleanupTables},
|
||||
{"mnodes", mnodeInitMnodes, mnodeCleanupMnodes},
|
||||
{"sdb", sdbInit, sdbCleanUp},
|
||||
{"balance", balanceInit, balanceCleanUp},
|
||||
{"balance", bnInit, bnCleanUp},
|
||||
{"grant", grantInit, grantCleanUp},
|
||||
{"show", mnodeInitShow, mnodeCleanUpShow}
|
||||
};
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "tglobal.h"
|
||||
#include "trpc.h"
|
||||
#include "tsync.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tutil.h"
|
||||
#include "tsocket.h"
|
||||
#include "tdataformat.h"
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
#include "tsystem.h"
|
||||
#include "tutil.h"
|
||||
#include "tgrant.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tglobal.h"
|
||||
#include "mnode.h"
|
||||
#include "dnode.h"
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "os.h"
|
||||
#include "taosdef.h"
|
||||
#include "tsched.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tgrant.h"
|
||||
#include "ttimer.h"
|
||||
#include "tglobal.h"
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "hash.h"
|
||||
#include "tutil.h"
|
||||
#include "tref.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tqueue.h"
|
||||
#include "twal.h"
|
||||
#include "tsync.h"
|
||||
|
@ -244,7 +244,7 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) {
|
|||
sdbInfo("vgId:1, mnode role changed from %s to %s", syncRole[tsSdbMgmt.role], syncRole[role]);
|
||||
|
||||
if (role == TAOS_SYNC_ROLE_MASTER && tsSdbMgmt.role != TAOS_SYNC_ROLE_MASTER) {
|
||||
balanceReset();
|
||||
bnReset();
|
||||
}
|
||||
tsSdbMgmt.role = role;
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
#include "tsocket.h"
|
||||
#include "tidpool.h"
|
||||
#include "tsync.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tglobal.h"
|
||||
#include "tdataformat.h"
|
||||
#include "dnode.h"
|
||||
|
@ -563,7 +563,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
|
|||
pVgroup->numOfVnodes = pDb->cfg.replications;
|
||||
pVgroup->createdTime = taosGetTimestampMs();
|
||||
pVgroup->accessState = TSDB_VN_ALL_ACCCESS;
|
||||
int32_t code = balanceAllocVnodes(pVgroup);
|
||||
int32_t code = bnAllocVnodes(pVgroup);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup, reason:%s", pDb->name, pVgroup->numOfVnodes,
|
||||
tstrerror(code));
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "os.h"
|
||||
#include "taosdef.h"
|
||||
#include "tsched.h"
|
||||
#include "tbalance.h"
|
||||
#include "tbn.h"
|
||||
#include "tgrant.h"
|
||||
#include "tglobal.h"
|
||||
#include "trpc.h"
|
||||
|
|
|
@ -27,12 +27,12 @@
|
|||
#include "monitor.h"
|
||||
#include "taoserror.h"
|
||||
|
||||
#define mnFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }}
|
||||
#define mnError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }}
|
||||
#define mnWarn(...) { if (monitorDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }}
|
||||
#define mnInfo(...) { if (monitorDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }}
|
||||
#define mnDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
|
||||
#define mnTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }}
|
||||
#define monFatal(...) { if (monDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }}
|
||||
#define monError(...) { if (monDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }}
|
||||
#define monWarn(...) { if (monDebugFlag & DEBUG_WARN) { taosPrintLog("MON WARN ", 255, __VA_ARGS__); }}
|
||||
#define monInfo(...) { if (monDebugFlag & DEBUG_INFO) { taosPrintLog("MON ", 255, __VA_ARGS__); }}
|
||||
#define monDebug(...) { if (monDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monDebugFlag, __VA_ARGS__); }}
|
||||
#define monTrace(...) { if (monDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
#define SQL_LENGTH 1030
|
||||
#define LOG_LEN_STR 100
|
||||
|
@ -48,12 +48,12 @@ typedef enum {
|
|||
MON_CMD_CREATE_TB_ACCT_ROOT,
|
||||
MON_CMD_CREATE_TB_SLOWQUERY,
|
||||
MON_CMD_MAX
|
||||
} EMonitorCommand;
|
||||
} EMonCmd;
|
||||
|
||||
typedef enum {
|
||||
MON_STATE_NOT_INIT,
|
||||
MON_STATE_INITED
|
||||
} EMonitorState;
|
||||
} EMonState;
|
||||
|
||||
typedef struct {
|
||||
pthread_t thread;
|
||||
|
@ -64,17 +64,17 @@ typedef struct {
|
|||
int8_t start; // enable/disable by mnode
|
||||
int8_t quiting; // taosd is quiting
|
||||
char sql[SQL_LENGTH + 1];
|
||||
} SMonitorConn;
|
||||
} SMonConn;
|
||||
|
||||
static SMonitorConn tsMonitor = {0};
|
||||
static void monitorSaveSystemInfo();
|
||||
static void *monitorThreadFunc(void *param);
|
||||
static void monitorBuildMonitorSql(char *sql, int32_t cmd);
|
||||
extern int32_t (*monitorStartSystemFp)();
|
||||
extern void (*monitorStopSystemFp)();
|
||||
extern void (*monitorExecuteSQLFp)(char *sql);
|
||||
static SMonConn tsMonitor = {0};
|
||||
static void monSaveSystemInfo();
|
||||
static void *monThreadFunc(void *param);
|
||||
static void monBuildMonitorSql(char *sql, int32_t cmd);
|
||||
extern int32_t (*monStartSystemFp)();
|
||||
extern void (*monStopSystemFp)();
|
||||
extern void (*monExecuteSQLFp)(char *sql);
|
||||
|
||||
int32_t monitorInitSystem() {
|
||||
int32_t monInitSystem() {
|
||||
if (tsMonitor.ep[0] == 0) {
|
||||
strcpy(tsMonitor.ep, tsLocalEp);
|
||||
}
|
||||
|
@ -90,29 +90,29 @@ int32_t monitorInitSystem() {
|
|||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&tsMonitor.thread, &thAttr, monitorThreadFunc, NULL)) {
|
||||
mnError("failed to create thread to for monitor module, reason:%s", strerror(errno));
|
||||
if (pthread_create(&tsMonitor.thread, &thAttr, monThreadFunc, NULL)) {
|
||||
monError("failed to create thread to for monitor module, reason:%s", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
mnDebug("monitor thread is launched");
|
||||
monDebug("monitor thread is launched");
|
||||
|
||||
monitorStartSystemFp = monitorStartSystem;
|
||||
monitorStopSystemFp = monitorStopSystem;
|
||||
monStartSystemFp = monStartSystem;
|
||||
monStopSystemFp = monStopSystem;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t monitorStartSystem() {
|
||||
int32_t monStartSystem() {
|
||||
taos_init();
|
||||
tsMonitor.start = 1;
|
||||
monitorExecuteSQLFp = monitorExecuteSQL;
|
||||
mnInfo("monitor module start");
|
||||
monExecuteSQLFp = monExecuteSQL;
|
||||
monInfo("monitor module start");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void *monitorThreadFunc(void *param) {
|
||||
mnDebug("starting to initialize monitor module ...");
|
||||
static void *monThreadFunc(void *param) {
|
||||
monDebug("starting to initialize monitor module ...");
|
||||
|
||||
while (1) {
|
||||
static int32_t accessTimes = 0;
|
||||
|
@ -121,7 +121,7 @@ static void *monitorThreadFunc(void *param) {
|
|||
|
||||
if (tsMonitor.quiting) {
|
||||
tsMonitor.state = MON_STATE_NOT_INIT;
|
||||
mnInfo("monitor thread will quit, for taosd is quiting");
|
||||
monInfo("monitor thread will quit, for taosd is quiting");
|
||||
break;
|
||||
} else {
|
||||
taosGetDisk();
|
||||
|
@ -132,7 +132,7 @@ static void *monitorThreadFunc(void *param) {
|
|||
}
|
||||
|
||||
if (dnodeGetDnodeId() <= 0) {
|
||||
mnDebug("dnode not initialized, waiting for 3000 ms to start monitor module");
|
||||
monDebug("dnode not initialized, waiting for 3000 ms to start monitor module");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -140,10 +140,10 @@ static void *monitorThreadFunc(void *param) {
|
|||
tsMonitor.state = MON_STATE_NOT_INIT;
|
||||
tsMonitor.conn = taos_connect(NULL, "monitor", tsInternalPass, "", 0);
|
||||
if (tsMonitor.conn == NULL) {
|
||||
mnError("failed to connect to database, reason:%s", tstrerror(terrno));
|
||||
monError("failed to connect to database, reason:%s", tstrerror(terrno));
|
||||
continue;
|
||||
} else {
|
||||
mnDebug("connect to database success");
|
||||
monDebug("connect to database success");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -151,16 +151,16 @@ static void *monitorThreadFunc(void *param) {
|
|||
int code = 0;
|
||||
|
||||
for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) {
|
||||
monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
|
||||
monBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex);
|
||||
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
|
||||
code = taos_errno(res);
|
||||
taos_free_result(res);
|
||||
|
||||
if (code != 0) {
|
||||
mnError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code));
|
||||
monError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code));
|
||||
break;
|
||||
} else {
|
||||
mnDebug("successfully to exec sql:%s", tsMonitor.sql);
|
||||
monDebug("successfully to exec sql:%s", tsMonitor.sql);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -171,16 +171,16 @@ static void *monitorThreadFunc(void *param) {
|
|||
|
||||
if (tsMonitor.state == MON_STATE_INITED) {
|
||||
if (accessTimes % tsMonitorInterval == 0) {
|
||||
monitorSaveSystemInfo();
|
||||
monSaveSystemInfo();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mnInfo("monitor thread is stopped");
|
||||
monInfo("monitor thread is stopped");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void monitorBuildMonitorSql(char *sql, int32_t cmd) {
|
||||
static void monBuildMonitorSql(char *sql, int32_t cmd) {
|
||||
memset(sql, 0, SQL_LENGTH);
|
||||
|
||||
if (cmd == MON_CMD_CREATE_DB) {
|
||||
|
@ -236,47 +236,47 @@ static void monitorBuildMonitorSql(char *sql, int32_t cmd) {
|
|||
sql[SQL_LENGTH] = 0;
|
||||
}
|
||||
|
||||
void monitorStopSystem() {
|
||||
void monStopSystem() {
|
||||
tsMonitor.start = 0;
|
||||
tsMonitor.state = MON_STATE_NOT_INIT;
|
||||
monitorExecuteSQLFp = NULL;
|
||||
mnInfo("monitor module stopped");
|
||||
monExecuteSQLFp = NULL;
|
||||
monInfo("monitor module stopped");
|
||||
}
|
||||
|
||||
void monitorCleanUpSystem() {
|
||||
void monCleanupSystem() {
|
||||
tsMonitor.quiting = 1;
|
||||
monitorStopSystem();
|
||||
monStopSystem();
|
||||
pthread_join(tsMonitor.thread, NULL);
|
||||
if (tsMonitor.conn != NULL) {
|
||||
taos_close(tsMonitor.conn);
|
||||
tsMonitor.conn = NULL;
|
||||
}
|
||||
mnInfo("monitor module is cleaned up");
|
||||
monInfo("monitor module is cleaned up");
|
||||
}
|
||||
|
||||
// unit is MB
|
||||
static int32_t monitorBuildMemorySql(char *sql) {
|
||||
static int32_t monBuildMemorySql(char *sql) {
|
||||
float sysMemoryUsedMB = 0;
|
||||
bool suc = taosGetSysMemory(&sysMemoryUsedMB);
|
||||
if (!suc) {
|
||||
mnDebug("failed to get sys memory info");
|
||||
monDebug("failed to get sys memory info");
|
||||
}
|
||||
|
||||
float procMemoryUsedMB = 0;
|
||||
suc = taosGetProcMemory(&procMemoryUsedMB);
|
||||
if (!suc) {
|
||||
mnDebug("failed to get proc memory info");
|
||||
monDebug("failed to get proc memory info");
|
||||
}
|
||||
|
||||
return sprintf(sql, ", %f, %f, %d", procMemoryUsedMB, sysMemoryUsedMB, tsTotalMemoryMB);
|
||||
}
|
||||
|
||||
// unit is %
|
||||
static int32_t monitorBuildCpuSql(char *sql) {
|
||||
static int32_t monBuildCpuSql(char *sql) {
|
||||
float sysCpuUsage = 0, procCpuUsage = 0;
|
||||
bool suc = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage);
|
||||
if (!suc) {
|
||||
mnDebug("failed to get cpu usage");
|
||||
monDebug("failed to get cpu usage");
|
||||
}
|
||||
|
||||
if (sysCpuUsage <= procCpuUsage) {
|
||||
|
@ -287,72 +287,72 @@ static int32_t monitorBuildCpuSql(char *sql) {
|
|||
}
|
||||
|
||||
// unit is GB
|
||||
static int32_t monitorBuildDiskSql(char *sql) {
|
||||
static int32_t monBuildDiskSql(char *sql) {
|
||||
return sprintf(sql, ", %f, %d", (tsTotalDataDirGB - tsAvailDataDirGB), (int32_t)tsTotalDataDirGB);
|
||||
}
|
||||
|
||||
// unit is Kb
|
||||
static int32_t monitorBuildBandSql(char *sql) {
|
||||
static int32_t monBuildBandSql(char *sql) {
|
||||
float bandSpeedKb = 0;
|
||||
bool suc = taosGetBandSpeed(&bandSpeedKb);
|
||||
if (!suc) {
|
||||
mnDebug("failed to get bandwidth speed");
|
||||
monDebug("failed to get bandwidth speed");
|
||||
}
|
||||
|
||||
return sprintf(sql, ", %f", bandSpeedKb);
|
||||
}
|
||||
|
||||
static int32_t monitorBuildReqSql(char *sql) {
|
||||
static int32_t monBuildReqSql(char *sql) {
|
||||
SStatisInfo info = dnodeGetStatisInfo();
|
||||
return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum);
|
||||
}
|
||||
|
||||
static int32_t monitorBuildIoSql(char *sql) {
|
||||
static int32_t monBuildIoSql(char *sql) {
|
||||
float readKB = 0, writeKB = 0;
|
||||
bool suc = taosGetProcIO(&readKB, &writeKB);
|
||||
if (!suc) {
|
||||
mnDebug("failed to get io info");
|
||||
monDebug("failed to get io info");
|
||||
}
|
||||
|
||||
return sprintf(sql, ", %f, %f", readKB, writeKB);
|
||||
}
|
||||
|
||||
static void monitorSaveSystemInfo() {
|
||||
static void monSaveSystemInfo() {
|
||||
int64_t ts = taosGetTimestampUs();
|
||||
char * sql = tsMonitor.sql;
|
||||
int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dn%d values(%" PRId64, tsMonitorDbName, dnodeGetDnodeId(), ts);
|
||||
|
||||
pos += monitorBuildCpuSql(sql + pos);
|
||||
pos += monitorBuildMemorySql(sql + pos);
|
||||
pos += monitorBuildDiskSql(sql + pos);
|
||||
pos += monitorBuildBandSql(sql + pos);
|
||||
pos += monitorBuildIoSql(sql + pos);
|
||||
pos += monitorBuildReqSql(sql + pos);
|
||||
pos += monBuildCpuSql(sql + pos);
|
||||
pos += monBuildMemorySql(sql + pos);
|
||||
pos += monBuildDiskSql(sql + pos);
|
||||
pos += monBuildBandSql(sql + pos);
|
||||
pos += monBuildIoSql(sql + pos);
|
||||
pos += monBuildReqSql(sql + pos);
|
||||
|
||||
void *res = taos_query(tsMonitor.conn, tsMonitor.sql);
|
||||
int code = taos_errno(res);
|
||||
taos_free_result(res);
|
||||
|
||||
if (code != 0) {
|
||||
mnError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql);
|
||||
monError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql);
|
||||
} else {
|
||||
mnDebug("successfully to save system info, sql:%s", tsMonitor.sql);
|
||||
monDebug("successfully to save system info, sql:%s", tsMonitor.sql);
|
||||
}
|
||||
}
|
||||
|
||||
static void montiorExecSqlCb(void *param, TAOS_RES *result, int32_t code) {
|
||||
static void monExecSqlCb(void *param, TAOS_RES *result, int32_t code) {
|
||||
int32_t c = taos_errno(result);
|
||||
if (c != TSDB_CODE_SUCCESS) {
|
||||
mnError("save %s failed, reason:%s", (char *)param, tstrerror(c));
|
||||
monError("save %s failed, reason:%s", (char *)param, tstrerror(c));
|
||||
} else {
|
||||
int32_t rows = taos_affected_rows(result);
|
||||
mnDebug("save %s succ, rows:%d", (char *)param, rows);
|
||||
monDebug("save %s succ, rows:%d", (char *)param, rows);
|
||||
}
|
||||
|
||||
taos_free_result(result);
|
||||
}
|
||||
|
||||
void monitorSaveAcctLog(SAcctMonitorObj *pMon) {
|
||||
void monSaveAcctLog(SAcctMonitorObj *pMon) {
|
||||
if (tsMonitor.state != MON_STATE_INITED) return;
|
||||
|
||||
char sql[1024] = {0};
|
||||
|
@ -382,11 +382,11 @@ void monitorSaveAcctLog(SAcctMonitorObj *pMon) {
|
|||
pMon->totalConns, pMon->maxConns,
|
||||
pMon->accessState);
|
||||
|
||||
mnDebug("save account info, sql:%s", sql);
|
||||
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "account info");
|
||||
monDebug("save account info, sql:%s", sql);
|
||||
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "account info");
|
||||
}
|
||||
|
||||
void monitorSaveLog(int32_t level, const char *const format, ...) {
|
||||
void monSaveLog(int32_t level, const char *const format, ...) {
|
||||
if (tsMonitor.state != MON_STATE_INITED) return;
|
||||
|
||||
va_list argpointer;
|
||||
|
@ -403,13 +403,13 @@ void monitorSaveLog(int32_t level, const char *const format, ...) {
|
|||
len += sprintf(sql + len, "', '%s')", tsLocalEp);
|
||||
sql[len++] = 0;
|
||||
|
||||
mnDebug("save log, sql: %s", sql);
|
||||
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "log");
|
||||
monDebug("save log, sql: %s", sql);
|
||||
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "log");
|
||||
}
|
||||
|
||||
void monitorExecuteSQL(char *sql) {
|
||||
void monExecuteSQL(char *sql) {
|
||||
if (tsMonitor.state != MON_STATE_INITED) return;
|
||||
|
||||
mnDebug("execute sql:%s", sql);
|
||||
taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "sql");
|
||||
monDebug("execute sql:%s", sql);
|
||||
taos_query_a(tsMonitor.conn, sql, monExecSqlCb, "sql");
|
||||
}
|
|
@ -24,7 +24,7 @@ sql alter dnode 1 debugFlag 135
|
|||
sql alter dnode 1 debugFlag 131
|
||||
sql alter dnode 1 monitor 0
|
||||
sql alter dnode 1 debugFlag 135
|
||||
sql alter dnode 1 monitorDebugFlag 135
|
||||
sql alter dnode 1 monDebugFlag 135
|
||||
sql alter dnode 1 vDebugFlag 135
|
||||
sql alter dnode 1 mDebugFlag 135
|
||||
sql alter dnode 1 cDebugFlag 135
|
||||
|
@ -44,15 +44,15 @@ sql_error alter dnode 2 tmrDebugFlag 135
|
|||
|
||||
print ======== step3
|
||||
sql_error alter $hostname1 debugFlag 135
|
||||
sql_error alter $hostname1 monitorDebugFlag 135
|
||||
sql_error alter $hostname1 monDebugFlag 135
|
||||
sql_error alter $hostname1 vDebugFlag 135
|
||||
sql_error alter $hostname1 mDebugFlag 135
|
||||
sql_error alter dnode $hostname2 debugFlag 135
|
||||
sql_error alter dnode $hostname2 monitorDebugFlag 135
|
||||
sql_error alter dnode $hostname2 monDebugFlag 135
|
||||
sql_error alter dnode $hostname2 vDebugFlag 135
|
||||
sql_error alter dnode $hostname2 mDebugFlag 135
|
||||
sql alter dnode $hostname1 debugFlag 135
|
||||
sql alter dnode $hostname1 monitorDebugFlag 135
|
||||
sql alter dnode $hostname1 monDebugFlag 135
|
||||
sql alter dnode $hostname1 vDebugFlag 135
|
||||
sql alter dnode $hostname1 tmrDebugFlag 131
|
||||
|
||||
|
|
|
@ -120,7 +120,7 @@ echo "cDebugFlag 143" >> $TAOS_CFG
|
|||
echo "jnidebugFlag 143" >> $TAOS_CFG
|
||||
echo "odbcdebugFlag 143" >> $TAOS_CFG
|
||||
echo "httpDebugFlag 143" >> $TAOS_CFG
|
||||
echo "monitorDebugFlag 143" >> $TAOS_CFG
|
||||
echo "monDebugFlag 143" >> $TAOS_CFG
|
||||
echo "mqttDebugFlag 143" >> $TAOS_CFG
|
||||
echo "qdebugFlag 143" >> $TAOS_CFG
|
||||
echo "rpcDebugFlag 143" >> $TAOS_CFG
|
||||
|
|
Loading…
Reference in New Issue