diff --git a/src/balance/inc/bnMain.h b/src/balance/inc/bnMain.h new file mode 100644 index 0000000000..101f5d5fd1 --- /dev/null +++ b/src/balance/inc/bnMain.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_BALANCE_MAIN_H +#define TDENGINE_BALANCE_MAIN_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "mnodeInt.h" +#include "mnodeDnode.h" + +typedef struct { + int32_t size; + int32_t maxSize; + SDnodeObj **list; +} SBnDnodes; + +typedef struct { + void * timer; + bool stop; + pthread_mutex_t mutex; + pthread_cond_t cond; + pthread_t thread; +} SBnThread; + +typedef struct { + pthread_mutex_t mutex; +} SBnMgmt; + +int32_t bnInit(); +void bnCleanUp(); +bool bnStart(); +void bnCheckStatus(); +void bnCheckModules(); + +extern SBnDnodes tsBnDnodes; +extern void *tsMnodeTmr; + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/balance/inc/bnScore.h b/src/balance/inc/bnScore.h new file mode 100644 index 0000000000..48e423b562 --- /dev/null +++ b/src/balance/inc/bnScore.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_BALANCE_SCORE_H +#define TDENGINE_BALANCE_SCORE_H + +#ifdef __cplusplus +extern "C" { +#endif +#include "bnMain.h" + +void bnInitDnodes(); +void bnCleanupDnodes(); +void bnAccquireDnodes(); +void bnReleaseDnodes(); +float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extraVnode); + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/balance/inc/bnThread.h b/src/balance/inc/bnThread.h index d337e963f1..df9b06cf87 100644 --- a/src/balance/inc/bnThread.h +++ b/src/balance/inc/bnThread.h @@ -13,17 +13,18 @@ * along with this program. If not, see . */ -#ifndef TDENGINE_BALANCE_H -#define TDENGINE_BALANCE_H +#ifndef TDENGINE_BALANCE_THREAD_H +#define TDENGINE_BALANCE_THREAD_H #ifdef __cplusplus extern "C" { #endif +#include "bnMain.h" -int32_t bnThreadInit(); -void bnThreadCleanup(); -void bnThreadSyncNotify(); -void bnThreadAsyncNotify(); +int32_t bnInitThread(); +void bnCleanupThread(); +void bnNotify(); +void bnStartTimer(int64_t mseconds); #ifdef __cplusplus } diff --git a/src/balance/src/balanceMain.c b/src/balance/src/balanceMain.c index b172867929..9f9d5103be 100644 --- a/src/balance/src/balanceMain.c +++ b/src/balance/src/balanceMain.c @@ -33,36 +33,23 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" -/* - * once sdb work as mater, then tsAccessSquence reset to zero - * increase tsAccessSquence every balance interval - */ -extern void * tsMnodeTmr; -static void * tsBalanceTimer = NULL; -static int32_t tsBalanceDnodeListSize = 0; -static SDnodeObj ** tsBalanceDnodeList = NULL; -static int32_t tsBalanceDnodeListMallocSize = 16; -static pthread_mutex_t tsBalanceMutex; +#include "bnMain.h" +#include "bnScore.h" +#include "bnThread.h" -static void balanceStartTimer(int64_t mseconds); -static void balanceInitDnodeList(); -static void balanceCleanupDnodeList(); -static void balanceAccquireDnodeList(); -static void balanceReleaseDnodeList(); -static void balanceMonitorDnodeModule(); -static float balanceTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extraVnode); -static int32_t balanceGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static SBnMgmt tsBnMgmt;; -static void balanceLock() { - pthread_mutex_lock(&tsBalanceMutex); +static void bnMonitorDnodeModule(); + +static void bnLock() { + pthread_mutex_lock(&tsBnMgmt.mutex); } -static void balanceUnLock() { - pthread_mutex_unlock(&tsBalanceMutex); +static void bnUnLock() { + pthread_mutex_unlock(&tsBnMgmt.mutex); } -static bool balanceCheckFree(SDnodeObj *pDnode) { +static bool bnCheckFree(SDnodeObj *pDnode) { if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) { mError("dnode:%d, status:%s not available", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status)); return false; @@ -86,7 +73,7 @@ static bool balanceCheckFree(SDnodeObj *pDnode) { return true; } -static void balanceDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) { +static void bnDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) { mDebug("vgId:%d, dnode:%d is dropping", pVgroup->vgId, pVnodeGid->dnodeId); SDnodeObj *pDnode = mnodeGetDnode(pVnodeGid->dnodeId); @@ -111,27 +98,26 @@ static void balanceDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) { mnodeUpdateVgroup(pVgroup); } -static void balanceSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) { +static void bnSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) { // SVnodeGid tmp = *pVnodeGid1; // *pVnodeGid1 = *pVnodeGid2; // *pVnodeGid2 = tmp; } -int32_t balanceAllocVnodes(SVgObj *pVgroup) { +int32_t bnAllocVnodes(SVgObj *pVgroup) { static int32_t randIndex = 0; int32_t dnode = 0; int32_t vnodes = 0; - balanceLock(); - - balanceAccquireDnodeList(); + bnLock(); + bnAccquireDnodes(); mDebug("db:%s, try alloc %d vnodes to vgroup, dnodes total:%d, avail:%d", pVgroup->dbName, pVgroup->numOfVnodes, - mnodeGetDnodesNum(), tsBalanceDnodeListSize); + mnodeGetDnodesNum(), tsBnDnodes.size); for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { - for (; dnode < tsBalanceDnodeListSize; ++dnode) { - SDnodeObj *pDnode = tsBalanceDnodeList[dnode]; - if (balanceCheckFree(pDnode)) { + for (; dnode < tsBnDnodes.size; ++dnode) { + SDnodeObj *pDnode = tsBnDnodes.list[dnode]; + if (bnCheckFree(pDnode)) { SVnodeGid *pVnodeGid = pVgroup->vnodeGid + i; pVnodeGid->dnodeId = pDnode->dnodeId; pVnodeGid->pDnode = pDnode; @@ -148,8 +134,8 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) { } if (vnodes != pVgroup->numOfVnodes) { - balanceReleaseDnodeList(); - balanceUnLock(); + bnReleaseDnodes(); + bnUnLock(); mDebug("db:%s, need vnodes:%d, but alloc:%d", pVgroup->dbName, pVgroup->numOfVnodes, vnodes); @@ -179,33 +165,33 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) { if (pVgroup->numOfVnodes == 1) { } else if (pVgroup->numOfVnodes == 2) { if (randIndex++ % 2 == 0) { - balanceSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1); + bnSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1); } } else { int32_t randVal = randIndex++ % 6; if (randVal == 1) { // 1, 0, 2 - balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1); + bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1); } else if (randVal == 2) { // 1, 2, 0 - balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1); - balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1); + bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); } else if (randVal == 3) { // 2, 1, 0 - balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2); } else if (randVal == 4) { // 2, 0, 1 - balanceSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2); - balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); } if (randVal == 5) { // 0, 2, 1 - balanceSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); + bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2); } else { } // 0, 1, 2 } - balanceReleaseDnodeList(); - balanceUnLock(); + bnReleaseDnodes(); + bnUnLock(); return TSDB_CODE_SUCCESS; } -static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { +static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { if (pVgroup->lbTime + 5 * tsStatusInterval > tsAccessSquence) { return false; } @@ -232,7 +218,7 @@ static bool balanceCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { * desc: remove one vnode from vgroup * all vnodes in vgroup should in ready state, except the balancing one **/ -static int32_t balanceRemoveVnode(SVgObj *pVgroup) { +static int32_t bnRemoveVnode(SVgObj *pVgroup) { if (pVgroup->numOfVnodes <= 1) return -1; SVnodeGid *pRmVnode = NULL; @@ -274,17 +260,17 @@ static int32_t balanceRemoveVnode(SVgObj *pVgroup) { pSelVnode = pRmVnode; } - if (!balanceCheckVgroupReady(pVgroup, pSelVnode)) { + if (!bnCheckVgroupReady(pVgroup, pSelVnode)) { mDebug("vgId:%d, is not ready", pVgroup->vgId); return -1; } else { mDebug("vgId:%d, is ready, discard dnode:%d", pVgroup->vgId, pSelVnode->dnodeId); - balanceDiscardVnode(pVgroup, pSelVnode); + bnDiscardVnode(pVgroup, pSelVnode); return TSDB_CODE_SUCCESS; } } -static bool balanceCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) { +static bool bnCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { SVnodeGid *pGid = &pVgroup->vnodeGid[i]; if (pGid->dnodeId == 0) break; @@ -299,13 +285,13 @@ static bool balanceCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) { /** * desc: add vnode to vgroup, find a new one if dest dnode is null **/ -static int32_t balanceAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { +static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { if (pDestDnode == NULL) { - for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) { - SDnodeObj *pDnode = tsBalanceDnodeList[i]; + for (int32_t i = 0; i < tsBnDnodes.size; ++i) { + SDnodeObj *pDnode = tsBnDnodes.list[i]; if (pDnode == pSrcDnode) continue; - if (balanceCheckDnodeInVgroup(pDnode, pVgroup)) continue; - if (!balanceCheckFree(pDnode)) continue; + if (bnCheckDnodeInVgroup(pDnode, pVgroup)) continue; + if (!bnCheckFree(pDnode)) continue; pDestDnode = pDnode; mDebug("vgId:%d, add vnode to dnode:%d", pVgroup->vgId, pDnode->dnodeId); @@ -333,25 +319,25 @@ static int32_t balanceAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj return TSDB_CODE_SUCCESS; } -static bool balanceMonitorBalance() { - if (tsBalanceDnodeListSize < 2) return false; +static bool bnMonitorBalance() { + if (tsBnDnodes.size < 2) return false; - for (int32_t src = tsBalanceDnodeListSize - 1; src >= 0; --src) { - SDnodeObj *pDnode = tsBalanceDnodeList[src]; - mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBalanceDnodeListSize - src - 1, + for (int32_t src = tsBnDnodes.size - 1; src >= 0; --src) { + SDnodeObj *pDnode = tsBnDnodes.list[src]; + mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBnDnodes.size - src - 1, pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status), pDnode->score, pDnode->numOfCores, pDnode->openVnodes); } - float scoresDiff = tsBalanceDnodeList[tsBalanceDnodeListSize - 1]->score - tsBalanceDnodeList[0]->score; + float scoresDiff = tsBnDnodes.list[tsBnDnodes.size - 1]->score - tsBnDnodes.list[0]->score; if (scoresDiff < 0.01) { - mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBalanceDnodeListSize, scoresDiff); + mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBnDnodes.size, scoresDiff); return false; } - for (int32_t src = tsBalanceDnodeListSize - 1; src > 0; --src) { - SDnodeObj *pSrcDnode = tsBalanceDnodeList[src]; - float srcScore = balanceTryCalcDnodeScore(pSrcDnode, -1); + for (int32_t src = tsBnDnodes.size - 1; src > 0; --src) { + SDnodeObj *pSrcDnode = tsBnDnodes.list[src]; + float srcScore = bnTryCalcDnodeScore(pSrcDnode, -1); if (tsEnableBalance == 0 && pSrcDnode->status != TAOS_DN_STATUS_DROPPING) { continue; } @@ -362,19 +348,19 @@ static bool balanceMonitorBalance() { pIter = mnodeGetNextVgroup(pIter, &pVgroup); if (pVgroup == NULL) break; - if (balanceCheckDnodeInVgroup(pSrcDnode, pVgroup)) { + if (bnCheckDnodeInVgroup(pSrcDnode, pVgroup)) { for (int32_t dest = 0; dest < src; dest++) { - SDnodeObj *pDestDnode = tsBalanceDnodeList[dest]; - if (balanceCheckDnodeInVgroup(pDestDnode, pVgroup)) continue; + SDnodeObj *pDestDnode = tsBnDnodes.list[dest]; + if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) continue; - float destScore = balanceTryCalcDnodeScore(pDestDnode, 1); + float destScore = bnTryCalcDnodeScore(pDestDnode, 1); if (srcScore + 0.0001 < destScore) continue; - if (!balanceCheckFree(pDestDnode)) continue; + if (!bnCheckFree(pDestDnode)) continue; mDebug("vgId:%d, balance from dnode:%d to dnode:%d, srcScore:%.1f:%.1f, destScore:%.1f:%.1f", pVgroup->vgId, pSrcDnode->dnodeId, pDestDnode->dnodeId, pSrcDnode->score, srcScore, pDestDnode->score, destScore); - balanceAddVnode(pVgroup, pSrcDnode, pDestDnode); + bnAddVnode(pVgroup, pSrcDnode, pDestDnode); mnodeDecVgroupRef(pVgroup); mnodeCancelGetNextVgroup(pIter); return true; @@ -392,7 +378,7 @@ static bool balanceMonitorBalance() { // 1. reset balanceAccessSquence to zero // 2. reset state of dnodes to offline // 3. reset lastAccess of dnodes to zero -void balanceReset() { +void bnReset() { void * pIter = NULL; SDnodeObj *pDnode = NULL; while (1) { @@ -413,7 +399,7 @@ void balanceReset() { tsAccessSquence = 0; } -static int32_t balanceMonitorVgroups() { +static int32_t bnMonitorVgroups() { void * pIter = NULL; SVgObj *pVgroup = NULL; bool hasUpdatingVgroup = false; @@ -429,11 +415,11 @@ static int32_t balanceMonitorVgroups() { if (vgReplica > dbReplica) { mInfo("vgId:%d, replica:%d numOfVnodes:%d, try remove one vnode", pVgroup->vgId, dbReplica, vgReplica); hasUpdatingVgroup = true; - code = balanceRemoveVnode(pVgroup); + code = bnRemoveVnode(pVgroup); } else if (vgReplica < dbReplica) { mInfo("vgId:%d, replica:%d numOfVnodes:%d, try add one vnode", pVgroup->vgId, dbReplica, vgReplica); hasUpdatingVgroup = true; - code = balanceAddVnode(pVgroup, NULL, NULL); + code = bnAddVnode(pVgroup, NULL, NULL); } mnodeDecVgroupRef(pVgroup); @@ -446,7 +432,7 @@ static int32_t balanceMonitorVgroups() { return hasUpdatingVgroup; } -static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) { +static bool bnMonitorDnodeDropping(SDnodeObj *pDnode) { mDebug("dnode:%d, in dropping state", pDnode->dnodeId); void * pIter = NULL; @@ -456,7 +442,7 @@ static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) { pIter = mnodeGetNextVgroup(pIter, &pVgroup); if (pVgroup == NULL) break; - hasThisDnode = balanceCheckDnodeInVgroup(pDnode, pVgroup); + hasThisDnode = bnCheckDnodeInVgroup(pDnode, pVgroup); mnodeDecVgroupRef(pVgroup); if (hasThisDnode) { @@ -474,7 +460,7 @@ static bool balanceMonitorDnodeDropping(SDnodeObj *pDnode) { return false; } -static bool balanceMontiorDropping() { +static bool bnMontiorDropping() { void *pIter = NULL; SDnodeObj *pDnode = NULL; @@ -499,7 +485,7 @@ static bool balanceMontiorDropping() { } if (pDnode->status == TAOS_DN_STATUS_DROPPING) { - bool ret = balanceMonitorDnodeDropping(pDnode); + bool ret = bnMonitorDnodeDropping(pDnode); mnodeDecDnodeRef(pDnode); mnodeCancelGetNextDnode(pIter); return ret; @@ -509,33 +495,32 @@ static bool balanceMontiorDropping() { return false; } -static bool balanceStart() { +bool bnStart() { if (!sdbIsMaster()) return false; - balanceLock(); + bnLock(); + bnAccquireDnodes(); - balanceAccquireDnodeList(); + bnMonitorDnodeModule(); - balanceMonitorDnodeModule(); - - bool updateSoon = balanceMontiorDropping(); + bool updateSoon = bnMontiorDropping(); if (!updateSoon) { - updateSoon = balanceMonitorVgroups(); + updateSoon = bnMonitorVgroups(); } if (!updateSoon) { - updateSoon = balanceMonitorBalance(); + updateSoon = bnMonitorBalance(); } - balanceReleaseDnodeList(); + bnReleaseDnodes(); - balanceUnLock(); + bnUnLock(); return updateSoon; } -static void balanceSetVgroupOffline(SDnodeObj* pDnode) { +static void bnSetVgroupOffline(SDnodeObj* pDnode) { void *pIter = NULL; while (1) { SVgObj *pVgroup; @@ -551,7 +536,7 @@ static void balanceSetVgroupOffline(SDnodeObj* pDnode) { } } -static void balanceCheckDnodeAccess() { +void bnCheckStatus() { void * pIter = NULL; SDnodeObj *pDnode = NULL; @@ -564,84 +549,39 @@ static void balanceCheckDnodeAccess() { pDnode->offlineReason = TAOS_DN_OFF_STATUS_MSG_TIMEOUT; mInfo("dnode:%d, set to offline state, access seq:%d last seq:%d laststat:%d", pDnode->dnodeId, tsAccessSquence, pDnode->lastAccess, pDnode->status); - balanceSetVgroupOffline(pDnode); + bnSetVgroupOffline(pDnode); } } mnodeDecDnodeRef(pDnode); } } -static void balanceProcessBalanceTimer(void *handle, void *tmrId) { - if (!sdbIsMaster()) return; - - tsBalanceTimer = NULL; - tsAccessSquence ++; - - balanceCheckDnodeAccess(); - bool updateSoon = false; - - if (handle == NULL) { - if (tsAccessSquence % tsBalanceInterval == 0) { - mDebug("balance function is scheduled by timer"); - updateSoon = balanceStart(); - } - } else { - int64_t mseconds = (int64_t)handle; - mDebug("balance function is scheduled by event for %" PRId64 " mseconds arrived", mseconds); - updateSoon = balanceStart(); - } - - if (updateSoon) { - balanceStartTimer(1000); - } else { - taosTmrReset(balanceProcessBalanceTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBalanceTimer); - } -} - -static void balanceStartTimer(int64_t mseconds) { - taosTmrReset(balanceProcessBalanceTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBalanceTimer); -} - -void balanceSyncNotify() { +void bnCheckModules() { if (sdbIsMaster()) { - balanceLock(); - balanceAccquireDnodeList(); - balanceMonitorDnodeModule(); - balanceReleaseDnodeList(); - balanceUnLock(); + bnLock(); + bnAccquireDnodes(); + bnMonitorDnodeModule(); + bnReleaseDnodes(); + bnUnLock(); } } -void balanceAsyncNotify() { - balanceStartTimer(500); -} - -int32_t balanceInit() { - mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_SCORES, balanceGetScoresMeta); - mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_SCORES, balanceRetrieveScores); - mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_SCORES, mnodeCancelGetNextDnode); - - pthread_mutex_init(&tsBalanceMutex, NULL); - balanceInitDnodeList(); - balanceStartTimer(2000); - mDebug("balance start fp:%p initialized", balanceProcessBalanceTimer); - - balanceReset(); +int32_t bnInit() { + pthread_mutex_init(&tsBnMgmt.mutex, NULL); + bnInitDnodes(); + bnInitThread(); + bnReset(); return 0; } -void balanceCleanUp() { - if (tsBalanceTimer != NULL) { - taosTmrStopA(&tsBalanceTimer); - pthread_mutex_destroy(&tsBalanceMutex); - tsBalanceTimer = NULL; - mDebug("stop balance timer"); - } - balanceCleanupDnodeList(); +void bnCleanUp() { + bnCleanupThread(); + pthread_mutex_destroy(&tsBnMgmt.mutex); + bnCleanupDnodes(); } -int32_t balanceDropDnode(SDnodeObj *pDnode) { +int32_t bnDropDnode(SDnodeObj *pDnode) { int32_t totalFreeVnodes = 0; void * pIter = NULL; SDnodeObj *pTempDnode = NULL; @@ -650,7 +590,7 @@ int32_t balanceDropDnode(SDnodeObj *pDnode) { pIter = mnodeGetNextDnode(pIter, &pTempDnode); if (pTempDnode == NULL) break; - if (pTempDnode != pDnode && balanceCheckFree(pTempDnode)) { + if (pTempDnode != pDnode && bnCheckFree(pTempDnode)) { totalFreeVnodes += (TSDB_MAX_VNODES - pTempDnode->openVnodes); } @@ -665,298 +605,19 @@ int32_t balanceDropDnode(SDnodeObj *pDnode) { pDnode->status = TAOS_DN_STATUS_DROPPING; mnodeUpdateDnode(pDnode); - balanceStartTimer(1100); + bnStartTimer(1100); return TSDB_CODE_SUCCESS; } -static int32_t balanceCalcCpuScore(SDnodeObj *pDnode) { - if (pDnode->cpuAvgUsage < 80) - return 0; - else if (pDnode->cpuAvgUsage < 90) - return 10; - else - return 50; -} -static int32_t balanceCalcMemoryScore(SDnodeObj *pDnode) { - if (pDnode->memoryAvgUsage < 80) - return 0; - else if (pDnode->memoryAvgUsage < 90) - return 10; - else - return 50; -} -static int32_t balanceCalcDiskScore(SDnodeObj *pDnode) { - if (pDnode->diskAvgUsage < 80) - return 0; - else if (pDnode->diskAvgUsage < 90) - return 10; - else - return 50; -} - -static int32_t balanceCalcBandwidthScore(SDnodeObj *pDnode) { - if (pDnode->bandwidthUsage < 30) - return 0; - else if (pDnode->bandwidthUsage < 80) - return 10; - else - return 50; -} - -static float balanceCalcModuleScore(SDnodeObj *pDnode) { - if (pDnode->numOfCores <= 0) return 0; - if (pDnode->isMgmt) { - return (float)tsMnodeEqualVnodeNum / pDnode->numOfCores; - } - return 0; -} - -static float balanceCalcVnodeScore(SDnodeObj *pDnode, int32_t extra) { - if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) return 100000000; - if (pDnode->numOfCores <= 0) return 0; - return (float)(pDnode->openVnodes + extra) / pDnode->numOfCores; -} - -/** - * calc singe score, such as cpu/memory/disk/bandwitdh/vnode - * 1. get the score config - * 2. if the value is out of range, use border data - * 3. otherwise use interpolation method - **/ -void balanceCalcDnodeScore(SDnodeObj *pDnode) { - pDnode->score = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) + - balanceCalcBandwidthScore(pDnode) + balanceCalcModuleScore(pDnode) + - balanceCalcVnodeScore(pDnode, 0) + pDnode->customScore; -} - -float balanceTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extra) { - int32_t systemScore = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) + - balanceCalcBandwidthScore(pDnode); - float moduleScore = balanceCalcModuleScore(pDnode); - float vnodeScore = balanceCalcVnodeScore(pDnode, extra); - - float score = systemScore + moduleScore + vnodeScore + pDnode->customScore; - return score; -} - -static void balanceInitDnodeList() { - tsBalanceDnodeList = calloc(tsBalanceDnodeListMallocSize, sizeof(SDnodeObj *)); -} - -static void balanceCleanupDnodeList() { - if (tsBalanceDnodeList != NULL) { - free(tsBalanceDnodeList); - tsBalanceDnodeList = NULL; - } -} - -static void balanceCheckDnodeListSize(int32_t dnodesNum) { - if (tsBalanceDnodeListMallocSize <= dnodesNum) { - tsBalanceDnodeListMallocSize = dnodesNum * 2; - tsBalanceDnodeList = realloc(tsBalanceDnodeList, tsBalanceDnodeListMallocSize * sizeof(SDnodeObj *)); - } -} - -void balanceAccquireDnodeList() { - int32_t dnodesNum = mnodeGetDnodesNum(); - balanceCheckDnodeListSize(dnodesNum); - - void * pIter = NULL; - SDnodeObj *pDnode = NULL; - int32_t dnodeIndex = 0; - - while (1) { - if (dnodeIndex >= dnodesNum) { - mnodeCancelGetNextDnode(pIter); - break; - } - - pIter = mnodeGetNextDnode(pIter, &pDnode); - if (pDnode == NULL) break; - if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { - mnodeDecDnodeRef(pDnode); - continue; - } - - balanceCalcDnodeScore(pDnode); - - int32_t orderIndex = dnodeIndex; - for (; orderIndex > 0; --orderIndex) { - if (pDnode->score > tsBalanceDnodeList[orderIndex - 1]->score) { - break; - } - tsBalanceDnodeList[orderIndex] = tsBalanceDnodeList[orderIndex - 1]; - } - tsBalanceDnodeList[orderIndex] = pDnode; - dnodeIndex++; - } - - tsBalanceDnodeListSize = dnodeIndex; -} - -void balanceReleaseDnodeList() { - for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) { - SDnodeObj *pDnode = tsBalanceDnodeList[i]; - if (pDnode != NULL) { - mnodeDecDnodeRef(pDnode); - } - } -} - -static int32_t balanceGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { - SUserObj *pUser = mnodeGetUserFromConn(pConn); - if (pUser == NULL) return 0; - - if (strcmp(pUser->pAcct->user, "root") != 0) { - mnodeDecUserRef(pUser); - return TSDB_CODE_MND_NO_RIGHTS; - } - - int32_t cols = 0; - SSchema *pSchema = pMeta->schema; - - pShow->bytes[cols] = 2; - pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "id"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "system scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "custom scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "module scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "vnode scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; - strcpy(pSchema[cols].name, "total scores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "open vnodes"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "cpu cores"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 18 + VARSTR_HEADER_SIZE; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "balance state"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htons(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int32_t i = 1; i < cols; ++i) { - pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - } - - pShow->numOfRows = mnodeGetDnodesNum(); - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - pShow->pIter = NULL; - - mnodeDecUserRef(pUser); - - return 0; -} - -static int32_t balanceRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) { - int32_t numOfRows = 0; - SDnodeObj *pDnode = NULL; - char * pWrite; - int32_t cols = 0; - - while (numOfRows < rows) { - pShow->pIter = mnodeGetNextDnode(pShow->pIter, &pDnode); - if (pDnode == NULL) break; - - int32_t systemScore = balanceCalcCpuScore(pDnode) + balanceCalcMemoryScore(pDnode) + balanceCalcDiskScore(pDnode) + - balanceCalcBandwidthScore(pDnode); - float moduleScore = balanceCalcModuleScore(pDnode); - float vnodeScore = balanceCalcVnodeScore(pDnode, 0); - - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDnode->dnodeId; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = systemScore; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = pDnode->customScore; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = (int32_t)moduleScore; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = (int32_t)vnodeScore; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(float *)pWrite = (int32_t)(vnodeScore + moduleScore + pDnode->customScore + systemScore); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDnode->openVnodes; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDnode->numOfCores; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status)); - cols++; - - numOfRows++; - mnodeDecDnodeRef(pDnode); - } - - mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); - pShow->numOfReads += numOfRows; - return numOfRows; -} - -static void balanceMonitorDnodeModule() { +static void bnMonitorDnodeModule() { int32_t numOfMnodes = mnodeGetMnodesNum(); if (numOfMnodes >= tsNumOfMnodes) return; - for (int32_t i = 0; i < tsBalanceDnodeListSize; ++i) { - SDnodeObj *pDnode = tsBalanceDnodeList[i]; + for (int32_t i = 0; i < tsBnDnodes.size; ++i) { + SDnodeObj *pDnode = tsBnDnodes.list[i]; if (pDnode == NULL) break; if (pDnode->isMgmt || pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) { @@ -980,7 +641,7 @@ static void balanceMonitorDnodeModule() { } } -int32_t balanceAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t dnodeId) { +int32_t bnAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t dnodeId) { if (!sdbIsMaster()) { mError("dnode:%d, failed to alter vgId:%d to dnode:%d, for self not master", pSrcDnode->dnodeId, vnodeId, dnodeId); return TSDB_CODE_MND_DNODE_NOT_EXIST; @@ -1004,29 +665,29 @@ int32_t balanceAlterDnode(struct SDnodeObj *pSrcDnode, int32_t vnodeId, int32_t return TSDB_CODE_MND_DNODE_NOT_EXIST; } - balanceLock(); - balanceAccquireDnodeList(); + bnLock(); + bnAccquireDnodes(); int32_t code = TSDB_CODE_SUCCESS; - if (!balanceCheckDnodeInVgroup(pSrcDnode, pVgroup)) { + if (!bnCheckDnodeInVgroup(pSrcDnode, pVgroup)) { mError("dnode:%d, failed to alter vgId:%d to dnode:%d, vgroup not in dnode:%d", pSrcDnode->dnodeId, vnodeId, dnodeId, pSrcDnode->dnodeId); code = TSDB_CODE_MND_VGROUP_NOT_IN_DNODE; - } else if (balanceCheckDnodeInVgroup(pDestDnode, pVgroup)) { + } else if (bnCheckDnodeInVgroup(pDestDnode, pVgroup)) { mError("dnode:%d, failed to alter vgId:%d to dnode:%d, vgroup already in dnode:%d", pSrcDnode->dnodeId, vnodeId, dnodeId, dnodeId); code = TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE; - } else if (!balanceCheckFree(pDestDnode)) { + } else if (!bnCheckFree(pDestDnode)) { mError("dnode:%d, failed to alter vgId:%d to dnode:%d, for dnode:%d not free", pSrcDnode->dnodeId, vnodeId, dnodeId, dnodeId); code = TSDB_CODE_MND_DNODE_NOT_FREE; } else { - code = balanceAddVnode(pVgroup, pSrcDnode, pDestDnode); + code = bnAddVnode(pVgroup, pSrcDnode, pDestDnode); mInfo("dnode:%d, alter vgId:%d to dnode:%d, result:%s", pSrcDnode->dnodeId, vnodeId, dnodeId, tstrerror(code)); } - balanceReleaseDnodeList(); - balanceUnLock(); + bnReleaseDnodes(); + bnUnLock(); mnodeDecVgroupRef(pVgroup); mnodeDecDnodeRef(pDestDnode); diff --git a/src/balance/src/bnScore.c b/src/balance/src/bnScore.c new file mode 100644 index 0000000000..db5d515fbf --- /dev/null +++ b/src/balance/src/bnScore.c @@ -0,0 +1,328 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tutil.h" +#include "tbalance.h" +#include "tsync.h" +#include "tsync.h" +#include "ttimer.h" +#include "tglobal.h" +#include "tdataformat.h" +#include "dnode.h" +#include "mnode.h" +#include "mnodeDef.h" +#include "mnodeInt.h" +#include "mnodeDnode.h" +#include "mnodeDb.h" +#include "mnodeMnode.h" +#include "mnodeSdb.h" +#include "mnodeShow.h" +#include "mnodeUser.h" +#include "mnodeVgroup.h" + +#include "bnScore.h" + +SBnDnodes tsBnDnodes; + +static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); +static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn); + +static int32_t bnCalcCpuScore(SDnodeObj *pDnode) { + if (pDnode->cpuAvgUsage < 80) + return 0; + else if (pDnode->cpuAvgUsage < 90) + return 10; + else + return 50; +} + +static int32_t bnCalcMemoryScore(SDnodeObj *pDnode) { + if (pDnode->memoryAvgUsage < 80) + return 0; + else if (pDnode->memoryAvgUsage < 90) + return 10; + else + return 50; +} + +static int32_t bnCalcDiskScore(SDnodeObj *pDnode) { + if (pDnode->diskAvgUsage < 80) + return 0; + else if (pDnode->diskAvgUsage < 90) + return 10; + else + return 50; +} + +static int32_t bnCalcBandScore(SDnodeObj *pDnode) { + if (pDnode->bandwidthUsage < 30) + return 0; + else if (pDnode->bandwidthUsage < 80) + return 10; + else + return 50; +} + +static float bnCalcModuleScore(SDnodeObj *pDnode) { + if (pDnode->numOfCores <= 0) return 0; + if (pDnode->isMgmt) { + return (float)tsMnodeEqualVnodeNum / pDnode->numOfCores; + } + return 0; +} + +static float bnCalcVnodeScore(SDnodeObj *pDnode, int32_t extra) { + if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) return 100000000; + if (pDnode->numOfCores <= 0) return 0; + return (float)(pDnode->openVnodes + extra) / pDnode->numOfCores; +} + +/** + * calc singe score, such as cpu/memory/disk/bandwitdh/vnode + * 1. get the score config + * 2. if the value is out of range, use border data + * 3. otherwise use interpolation method + **/ +static void bnCalcDnodeScore(SDnodeObj *pDnode) { + pDnode->score = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) + + bnCalcBandScore(pDnode) + bnCalcModuleScore(pDnode) + bnCalcVnodeScore(pDnode, 0) + + pDnode->customScore; +} + +float bnTryCalcDnodeScore(SDnodeObj *pDnode, int32_t extra) { + int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) + + bnCalcBandScore(pDnode); + float moduleScore = bnCalcModuleScore(pDnode); + float vnodeScore = bnCalcVnodeScore(pDnode, extra); + + float score = systemScore + moduleScore + vnodeScore + pDnode->customScore; + return score; +} + +void bnInitDnodes() { + mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_SCORES, bnGetScoresMeta); + mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_SCORES, bnRetrieveScores); + mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_SCORES, mnodeCancelGetNextDnode); + + memset(&tsBnDnodes, 0, sizeof(SBnDnodes)); + tsBnDnodes.maxSize = 16; + tsBnDnodes.list = calloc(tsBnDnodes.maxSize, sizeof(SDnodeObj *)); +} + +void bnCleanupDnodes() { + if (tsBnDnodes.list != NULL) { + free(tsBnDnodes.list); + tsBnDnodes.list = NULL; + } +} + +static void bnCheckDnodesSize(int32_t dnodesNum) { + if (tsBnDnodes.maxSize <= dnodesNum) { + tsBnDnodes.maxSize = dnodesNum * 2; + tsBnDnodes.list = realloc(tsBnDnodes.list, tsBnDnodes.maxSize * sizeof(SDnodeObj *)); + } +} + +void bnAccquireDnodes() { + int32_t dnodesNum = mnodeGetDnodesNum(); + bnCheckDnodesSize(dnodesNum); + + void * pIter = NULL; + SDnodeObj *pDnode = NULL; + int32_t dnodeIndex = 0; + + while (1) { + if (dnodeIndex >= dnodesNum) { + mnodeCancelGetNextDnode(pIter); + break; + } + + pIter = mnodeGetNextDnode(pIter, &pDnode); + if (pDnode == NULL) break; + if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { + mnodeDecDnodeRef(pDnode); + continue; + } + + bnCalcDnodeScore(pDnode); + + int32_t orderIndex = dnodeIndex; + for (; orderIndex > 0; --orderIndex) { + if (pDnode->score > tsBnDnodes.list[orderIndex - 1]->score) { + break; + } + tsBnDnodes.list[orderIndex] = tsBnDnodes.list[orderIndex - 1]; + } + tsBnDnodes.list[orderIndex] = pDnode; + dnodeIndex++; + } + + tsBnDnodes.size = dnodeIndex; +} + +void bnReleaseDnodes() { + for (int32_t i = 0; i < tsBnDnodes.size; ++i) { + SDnodeObj *pDnode = tsBnDnodes.list[i]; + if (pDnode != NULL) { + mnodeDecDnodeRef(pDnode); + } + } +} + +static int32_t bnGetScoresMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { + SUserObj *pUser = mnodeGetUserFromConn(pConn); + if (pUser == NULL) return 0; + + if (strcmp(pUser->pAcct->user, "root") != 0) { + mnodeDecUserRef(pUser); + return TSDB_CODE_MND_NO_RIGHTS; + } + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "id"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "system scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "custom scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "module scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "vnode scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_FLOAT; + strcpy(pSchema[cols].name, "total scores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "open vnodes"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "cpu cores"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 18 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "balance state"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = mnodeGetDnodesNum(); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + pShow->pIter = NULL; + + mnodeDecUserRef(pUser); + + return 0; +} + +static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + SDnodeObj *pDnode = NULL; + char * pWrite; + int32_t cols = 0; + + while (numOfRows < rows) { + pShow->pIter = mnodeGetNextDnode(pShow->pIter, &pDnode); + if (pDnode == NULL) break; + + int32_t systemScore = bnCalcCpuScore(pDnode) + bnCalcMemoryScore(pDnode) + bnCalcDiskScore(pDnode) + bnCalcBandScore(pDnode); + float moduleScore = bnCalcModuleScore(pDnode); + float vnodeScore = bnCalcVnodeScore(pDnode, 0); + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pDnode->dnodeId; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = systemScore; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = pDnode->customScore; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = (int32_t)moduleScore; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = (int32_t)vnodeScore; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(float *)pWrite = (int32_t)(vnodeScore + moduleScore + pDnode->customScore + systemScore); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pDnode->openVnodes; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pDnode->numOfCores; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status)); + cols++; + + numOfRows++; + mnodeDecDnodeRef(pDnode); + } + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + pShow->numOfReads += numOfRows; + return numOfRows; +} diff --git a/src/balance/src/bnThread.c b/src/balance/src/bnThread.c index 12cb2cf992..39eb6d0e31 100644 --- a/src/balance/src/bnThread.c +++ b/src/balance/src/bnThread.c @@ -34,14 +34,10 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" -typedef struct { - bool stop; - pthread_mutex_t mutex; - pthread_cond_t cond; - pthread_t thread; -} SBalanceThread; -static SBalanceThread tsBnThread; +#include "bnThread.h" + +static SBnThread tsBnThread; static void *bnThreadFunc(void *arg) { while (1) { @@ -52,14 +48,16 @@ static void *bnThreadFunc(void *arg) { } pthread_cond_wait(&tsBnThread.cond, &tsBnThread.mutex); - + bool updateSoon = bnStart(); + bnStartTimer(updateSoon ? 1000 : -1); pthread_mutex_unlock(&(tsBnThread.mutex)); } return NULL; } -int32_t bnThreadInit() { +int32_t bnInitThread() { + memset(&tsBnThread, 0, sizeof(SBnThread)); tsBnThread.stop = false; pthread_mutex_init(&tsBnThread.mutex, NULL); pthread_cond_init(&tsBnThread.cond, NULL); @@ -75,13 +73,20 @@ int32_t bnThreadInit() { return -1; } + bnStartTimer(2000); mDebug("balance thread is created"); return 0; } -void bnThreadCleanup() { +void bnCleanupThread() { mDebug("balance thread will be cleanup"); + if (tsBnThread.timer != NULL) { + taosTmrStopA(&tsBnThread.timer); + tsBnThread.timer = NULL; + mDebug("stop balance timer"); + } + pthread_mutex_lock(&tsBnThread.mutex); tsBnThread.stop = true; pthread_cond_signal(&tsBnThread.cond); @@ -92,16 +97,47 @@ void bnThreadCleanup() { pthread_mutex_destroy(&tsBnThread.mutex); } -void bnThreadSyncNotify() { - mDebug("balance thread sync notify"); - pthread_mutex_lock(&tsBnThread.mutex); - pthread_cond_signal(&tsBnThread.cond); - pthread_mutex_unlock(&(tsBnThread.mutex)); -} - -void bnThreadAsyncNotify() { +static void bnPostSignal() { mDebug("balance thread async notify"); pthread_mutex_lock(&tsBnThread.mutex); pthread_cond_signal(&tsBnThread.cond); pthread_mutex_unlock(&(tsBnThread.mutex)); } + +/* + * once sdb work as mater, then tsAccessSquence reset to zero + * increase tsAccessSquence every balance interval + */ + +static void bnProcessTimer(void *handle, void *tmrId) { + if (!sdbIsMaster()) return; + + tsBnThread.timer = NULL; + tsAccessSquence++; + + bnCheckStatus(); + + if (handle == NULL) { + if (tsAccessSquence % tsBalanceInterval == 0) { + mDebug("balance function is scheduled by timer"); + bnPostSignal(); + } + } else { + int64_t mseconds = (int64_t)handle; + mDebug("balance function is scheduled by event for %" PRId64 " mseconds arrived", mseconds); + bnPostSignal(); + } +} + +void bnStartTimer(int64_t mseconds) { + bool updateSoon = (mseconds != -1); + if (updateSoon) { + taosTmrReset(bnProcessTimer, mseconds, (void *)mseconds, tsMnodeTmr, &tsBnThread.timer); + } else { + taosTmrReset(bnProcessTimer, tsStatusInterval * 1000, NULL, tsMnodeTmr, &tsBnThread.timer); + } +} + +void bnNotify() { + bnStartTimer(500); +} diff --git a/src/inc/tbalance.h b/src/inc/tbalance.h index f0da4a3747..b9f4e3c608 100644 --- a/src/inc/tbalance.h +++ b/src/inc/tbalance.h @@ -23,14 +23,14 @@ extern "C" { struct SVgObj; struct SDnodeObj; -int32_t balanceInit(); -void balanceCleanUp(); -void balanceAsyncNotify(); -void balanceSyncNotify(); -void balanceReset(); -int32_t balanceAllocVnodes(struct SVgObj *pVgroup); -int32_t balanceAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId); -int32_t balanceDropDnode(struct SDnodeObj *pDnode); +int32_t bnInit(); +void bnCleanUp(); +void bnNotify(); +void bnCheckModules(); +void bnReset(); +int32_t bnAllocVnodes(struct SVgObj *pVgroup); +int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId); +int32_t bnDropDnode(struct SDnodeObj *pDnode); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 69e8f076e9..63bd066c36 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -1004,7 +1004,7 @@ static int32_t mnodeAlterDbCb(SMnodeMsg *pMsg, int32_t code) { mDebug("db:%s, all vgroups is altered", pDb->name); mLInfo("db:%s, is alterd by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); - balanceAsyncNotify(); + bnNotify(); return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 65f4060392..0d429f5797 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -115,7 +115,7 @@ static int32_t mnodeDnodeActionDelete(SSdbRow *pRow) { mnodeDropAllDnodeVgroups(pDnode); #endif mnodeDropMnodeLocal(pDnode->dnodeId); - balanceAsyncNotify(); + bnNotify(); mnodeUpdateDnodeEps(); mDebug("dnode:%d, all vgroups is dropped from sdb", pDnode->dnodeId); @@ -347,7 +347,7 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) { return TSDB_CODE_MND_INVALID_DNODE_CFG_OPTION; } - int32_t code = balanceAlterDnode(pDnode, vnodeId, dnodeId); + int32_t code = bnAlterDnode(pDnode, vnodeId, dnodeId); mnodeDecDnodeRef(pDnode); return code; } else { @@ -591,8 +591,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { mInfo("dnode:%d, from offline to online", pDnode->dnodeId); pDnode->status = TAOS_DN_STATUS_READY; pDnode->offlineReason = TAOS_DN_OFF_ONLINE; - balanceSyncNotify(); - balanceAsyncNotify(); + bnCheckModules(); + bnNotify(); } if (openVnodes != pDnode->openVnodes) { @@ -708,7 +708,7 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) { #ifndef _SYNC int32_t code = mnodeDropDnode(pDnode, pMsg); #else - int32_t code = balanceDropDnode(pDnode); + int32_t code = bnDropDnode(pDnode); #endif mnodeDecDnodeRef(pDnode); return code; @@ -1182,12 +1182,12 @@ static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) { #ifndef _SYNC -int32_t balanceInit() { return TSDB_CODE_SUCCESS; } -void balanceCleanUp() {} -void balanceAsyncNotify() {} -void balanceSyncNotify() {} -void balanceReset() {} -int32_t balanceAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId) { return TSDB_CODE_SYN_NOT_ENABLED; } +int32_t bnInit() { return TSDB_CODE_SUCCESS; } +void bnCleanUp() {} +void bnNotify() {} +void bnCheckModules() {} +void bnReset() {} +int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId) { return TSDB_CODE_SYN_NOT_ENABLED; } char* syncRole[] = { "offline", @@ -1197,7 +1197,7 @@ char* syncRole[] = { "master" }; -int32_t balanceAllocVnodes(SVgObj *pVgroup) { +int32_t bnAllocVnodes(SVgObj *pVgroup) { void * pIter = NULL; SDnodeObj *pDnode = NULL; SDnodeObj *pSelDnode = NULL; diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index d15b32da54..9c8bc6c35b 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -58,7 +58,7 @@ static const SMnodeComponent tsMnodeComponents[] = { {"tables", mnodeInitTables, mnodeCleanupTables}, {"mnodes", mnodeInitMnodes, mnodeCleanupMnodes}, {"sdb", sdbInit, sdbCleanUp}, - {"balance", balanceInit, balanceCleanUp}, + {"balance", bnInit, bnCleanUp}, {"grant", grantInit, grantCleanUp}, {"show", mnodeInitShow, mnodeCleanUpShow} }; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 40e2e1cfcc..07b3ef09ec 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -244,7 +244,7 @@ static void sdbNotifyRole(int32_t vgId, int8_t role) { sdbInfo("vgId:1, mnode role changed from %s to %s", syncRole[tsSdbMgmt.role], syncRole[role]); if (role == TAOS_SYNC_ROLE_MASTER && tsSdbMgmt.role != TAOS_SYNC_ROLE_MASTER) { - balanceReset(); + bnReset(); } tsSdbMgmt.role = role; diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index 3e974f417f..7cf9e35bd2 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -563,7 +563,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) { pVgroup->numOfVnodes = pDb->cfg.replications; pVgroup->createdTime = taosGetTimestampMs(); pVgroup->accessState = TSDB_VN_ALL_ACCCESS; - int32_t code = balanceAllocVnodes(pVgroup); + int32_t code = bnAllocVnodes(pVgroup); if (code != TSDB_CODE_SUCCESS) { mError("db:%s, no enough dnode to alloc %d vnodes to vgroup, reason:%s", pDb->name, pVgroup->numOfVnodes, tstrerror(code));