[TD-17] fix error while alloc vnodes
This commit is contained in:
parent
dffade9650
commit
7330301f5b
|
@ -128,7 +128,9 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
|
||||
rpcSendRequest(tsDnodeMClientRpc, &tsDnodeMnodeIpList, rpcMsg);
|
||||
if (tsDnodeMClientRpc) {
|
||||
rpcSendRequest(tsDnodeMClientRpc, &tsDnodeMnodeIpList, rpcMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static bool dnodeReadMnodeIpList() {
|
||||
|
|
|
@ -178,9 +178,9 @@ static void dnodeCleanUpSystem() {
|
|||
tclearModuleStatus(TSDB_MOD_MGMT);
|
||||
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
|
||||
dnodeCleanupShell();
|
||||
dnodeCleanupMClient();
|
||||
dnodeCleanupMnode();
|
||||
dnodeCleanupMgmt();
|
||||
dnodeCleanupMClient();
|
||||
dnodeCleanupWrite();
|
||||
dnodeCleanupRead();
|
||||
dnodeCleanUpModules();
|
||||
|
|
|
@ -24,6 +24,7 @@ extern "C" {
|
|||
int32_t mgmtInitDnodes();
|
||||
void mgmtCleanUpDnodes();
|
||||
int32_t mgmtGetDnodesNum();
|
||||
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
|
||||
SDnodeObj* mgmtGetDnode(int32_t dnodeId);
|
||||
SDnodeObj* mgmtGetDnodeByIp(uint32_t ip);
|
||||
|
||||
|
|
|
@ -18,47 +18,37 @@
|
|||
#include "mgmtBalance.h"
|
||||
#include "mgmtDnode.h"
|
||||
|
||||
int32_t (*mgmtInitBalanceFp)() = NULL;
|
||||
void (*mgmtCleanupBalanceFp)() = NULL;
|
||||
void (*mgmtStartBalanceTimerFp)(int32_t afterMs) = NULL;
|
||||
int32_t (*mgmtAllocVnodesFp)(SVgObj *pVgroup) = NULL;
|
||||
|
||||
int32_t mgmtInitBalance() {
|
||||
if (mgmtInitBalanceFp) {
|
||||
return (*mgmtInitBalanceFp)();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtCleanupBalance() {
|
||||
if (mgmtCleanupBalanceFp) {
|
||||
(*mgmtCleanupBalanceFp)();
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtStartBalanceTimer(int32_t afterMs) {
|
||||
if (mgmtStartBalanceTimerFp) {
|
||||
(*mgmtStartBalanceTimerFp)(afterMs);
|
||||
}
|
||||
}
|
||||
int32_t mgmtInitBalance() { return 0; }
|
||||
void mgmtCleanupBalance() {}
|
||||
void mgmtStartBalanceTimer(int32_t afterMs) {}
|
||||
|
||||
int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
|
||||
if (mgmtAllocVnodesFp) {
|
||||
return (*mgmtAllocVnodesFp)(pVgroup);
|
||||
void * pNode = NULL;
|
||||
SDnodeObj *pDnode = NULL;
|
||||
SDnodeObj *pSelDnode = NULL;
|
||||
float vnodeUsage = 1.0;
|
||||
|
||||
while (1) {
|
||||
pNode = mgmtGetNextDnode(pNode, &pDnode);
|
||||
if (pDnode == NULL) break;
|
||||
if (pDnode->numOfTotalVnodes <= 0) continue;
|
||||
if (pDnode->openVnodes == pDnode->numOfTotalVnodes) continue;
|
||||
|
||||
float usage = (float)pDnode->openVnodes / pDnode->numOfTotalVnodes;
|
||||
if (usage <= vnodeUsage) {
|
||||
pSelDnode = pDnode;
|
||||
vnodeUsage = usage;
|
||||
}
|
||||
}
|
||||
|
||||
SDnodeObj *pDnode = mgmtGetDnode(1);
|
||||
if (pDnode == NULL) return TSDB_CODE_OTHERS;
|
||||
|
||||
if (pDnode->openVnodes < pDnode->numOfTotalVnodes) {
|
||||
pVgroup->vnodeGid[0].dnodeId = pDnode->dnodeId;
|
||||
pVgroup->vnodeGid[0].privateIp = pDnode->privateIp;
|
||||
pVgroup->vnodeGid[0].publicIp = pDnode->publicIp;
|
||||
mTrace("dnode:%d, alloc one vnode to vgroup", pDnode->dnodeId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
mError("dnode:%d, failed to alloc vnode to vgroup", pDnode->dnodeId);
|
||||
if (pSelDnode == NULL) {
|
||||
mError("failed to alloc vnode to vgroup", pDnode->dnodeId);
|
||||
return TSDB_CODE_NO_ENOUGH_DNODES;
|
||||
}
|
||||
|
||||
pVgroup->vnodeGid[0].dnodeId = pSelDnode->dnodeId;
|
||||
pVgroup->vnodeGid[0].privateIp = pSelDnode->privateIp;
|
||||
pVgroup->vnodeGid[0].publicIp = pSelDnode->publicIp;
|
||||
mTrace("dnode:%d, alloc one vnode to vgroup", pSelDnode->dnodeId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ static void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg);
|
|||
extern int32_t clusterInit();
|
||||
extern void clusterCleanUp();
|
||||
extern int32_t clusterGetDnodesNum();
|
||||
extern void * clusterGetNextDnode(void *pNode, SDnodeObj **pDnode);
|
||||
extern SDnodeObj* clusterGetDnode(int32_t dnodeId);
|
||||
extern SDnodeObj* clusterGetDnodeByIp(uint32_t ip);
|
||||
static SDnodeObj tsDnodeObj = {0};
|
||||
|
@ -98,6 +99,19 @@ int32_t mgmtGetDnodesNum() {
|
|||
#endif
|
||||
}
|
||||
|
||||
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode) {
|
||||
#ifdef _CLUSTER
|
||||
return (*clusterGetNextDnode)(pNode, pDnode);
|
||||
#else
|
||||
if (*pDnode == NULL) {
|
||||
*pDnode = &tsDnodeObj;
|
||||
} else {
|
||||
*pDnode = NULL;
|
||||
}
|
||||
return *pDnode;
|
||||
#endif
|
||||
}
|
||||
|
||||
void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||
if (mgmtCheckRedirect(pMsg->thandle)) return;
|
||||
|
|
Loading…
Reference in New Issue