shm
This commit is contained in:
parent
cebcfc9115
commit
c673ad9334
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DND_MNODE_H_
|
||||
#define _TD_DND_MNODE_H_
|
||||
#ifndef _TD_DND_MNODE_MGMT_H_
|
||||
#define _TD_DND_MNODE_MGMT_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -24,6 +24,9 @@ extern "C" {
|
|||
int32_t mmInit(SDnode *pDnode);
|
||||
void mmCleanup(SDnode *pDnode);
|
||||
|
||||
int32_t mmReadFile(SDnode *pDnode);
|
||||
int32_t mmWriteFile(SDnode *pDnode);
|
||||
|
||||
////////////
|
||||
|
||||
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
|
||||
|
@ -34,11 +37,11 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
|||
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
||||
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
|
||||
|
||||
int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
|
||||
SMonGrantInfo *pGrantInfo);
|
||||
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
|
||||
SMonGrantInfo *pGrantInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DND_MNODE_H_*/
|
||||
#endif /*_TD_DND_MNODE_MGMT_H_*/
|
|
@ -0,0 +1,167 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http:www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mm.h"
|
||||
#include "dndMgmt.h"
|
||||
#include "dndTransport.h"
|
||||
#include "dndWorker.h"
|
||||
|
||||
// int32_t mmReadFile(SMnodeMgmt *pMgmt) {
|
||||
// int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
|
||||
// int32_t len = 0;
|
||||
// int32_t maxLen = 4096;
|
||||
// char *content = calloc(1, maxLen + 1);
|
||||
// cJSON *root = NULL;
|
||||
|
||||
// char file[PATH_MAX] = {0};
|
||||
// snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
|
||||
|
||||
// TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
|
||||
// if (pFile == NULL) {
|
||||
// dDebug("file %s not exist", file);
|
||||
// code = 0;
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
|
||||
// len = (int32_t)taosReadFile(pFile, content, maxLen);
|
||||
// if (len <= 0) {
|
||||
// dError("failed to read %s since content is null", file);
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
|
||||
// content[len] = 0;
|
||||
// root = cJSON_Parse(content);
|
||||
// if (root == NULL) {
|
||||
// dError("failed to read %s since invalid json format", file);
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
|
||||
// cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
|
||||
// if (!deployed || deployed->type != cJSON_Number) {
|
||||
// dError("failed to read %s since deployed not found", file);
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
// pMgmt->deployed = deployed->valueint;
|
||||
|
||||
// cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
|
||||
// if (!dropped || dropped->type != cJSON_Number) {
|
||||
// dError("failed to read %s since dropped not found", file);
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
// pMgmt->dropped = dropped->valueint;
|
||||
|
||||
// cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
|
||||
// if (!mnodes || mnodes->type != cJSON_Array) {
|
||||
// dError("failed to read %s since nodes not found", file);
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
|
||||
// pMgmt->replica = cJSON_GetArraySize(mnodes);
|
||||
// if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
|
||||
// dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
|
||||
// for (int32_t i = 0; i < pMgmt->replica; ++i) {
|
||||
// cJSON *node = cJSON_GetArrayItem(mnodes, i);
|
||||
// if (node == NULL) break;
|
||||
|
||||
// SReplica *pReplica = &pMgmt->replicas[i];
|
||||
|
||||
// cJSON *id = cJSON_GetObjectItem(node, "id");
|
||||
// if (!id || id->type != cJSON_Number) {
|
||||
// dError("failed to read %s since id not found", file);
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
// pReplica->id = id->valueint;
|
||||
|
||||
// cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
|
||||
// if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
|
||||
// dError("failed to read %s since fqdn not found", file);
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
// tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
|
||||
|
||||
// cJSON *port = cJSON_GetObjectItem(node, "port");
|
||||
// if (!port || port->type != cJSON_Number) {
|
||||
// dError("failed to read %s since port not found", file);
|
||||
// goto PRASE_MNODE_OVER;
|
||||
// }
|
||||
// pReplica->port = port->valueint;
|
||||
// }
|
||||
|
||||
// code = 0;
|
||||
// dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
|
||||
|
||||
// PRASE_MNODE_OVER:
|
||||
// if (content != NULL) free(content);
|
||||
// if (root != NULL) cJSON_Delete(root);
|
||||
// if (pFile != NULL) taosCloseFile(&pFile);
|
||||
|
||||
// terrno = code;
|
||||
// return code;
|
||||
// }
|
||||
|
||||
// int32_t mmWriteFile(SMnodeMgmt *pMgmt) {
|
||||
// char file[PATH_MAX] = {0};
|
||||
// snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
|
||||
|
||||
// TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
// if (pFile == NULL) {
|
||||
// terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
|
||||
// dError("failed to write %s since %s", file, terrstr());
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
// int32_t len = 0;
|
||||
// int32_t maxLen = 4096;
|
||||
// char *content = calloc(1, maxLen + 1);
|
||||
|
||||
// len += snprintf(content + len, maxLen - len, "{\n");
|
||||
// len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
|
||||
|
||||
// len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
|
||||
// len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
|
||||
// for (int32_t i = 0; i < pMgmt->replica; ++i) {
|
||||
// SReplica *pReplica = &pMgmt->replicas[i];
|
||||
// len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
|
||||
// len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
|
||||
// len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
|
||||
// if (i < pMgmt->replica - 1) {
|
||||
// len += snprintf(content + len, maxLen - len, " },{\n");
|
||||
// } else {
|
||||
// len += snprintf(content + len, maxLen - len, " }]\n");
|
||||
// }
|
||||
// }
|
||||
// len += snprintf(content + len, maxLen - len, "}\n");
|
||||
|
||||
// taosWriteFile(pFile, content, len);
|
||||
// taosFsyncFile(pFile);
|
||||
// taosCloseFile(&pFile);
|
||||
// free(content);
|
||||
|
||||
// char realfile[PATH_MAX];
|
||||
// snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
|
||||
|
||||
// if (taosRenameFile(file, realfile) != 0) {
|
||||
// terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
|
||||
// dError("failed to rename %s since %s", file, terrstr());
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
// dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
|
||||
// return 0;
|
||||
// }
|
|
@ -14,10 +14,10 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mm.h"
|
||||
#include "dndMgmt.h"
|
||||
#include "dndTransport.h"
|
||||
#include "dndWorker.h"
|
||||
#include "mm.h"
|
||||
|
||||
static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg);
|
||||
static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg);
|
||||
|
@ -52,7 +52,7 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
|
|||
dTrace("release mnode, refCount:%d", refCount);
|
||||
}
|
||||
|
||||
static int32_t dndReadMnodeFile(SDnode *pDnode) {
|
||||
int32_t mmReadFile(SDnode *pDnode) {
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
|
||||
int32_t len = 0;
|
||||
|
@ -150,7 +150,7 @@ PRASE_MNODE_OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t dndWriteMnodeFile(SDnode *pDnode) {
|
||||
int32_t mmWriteFile(SDnode *pDnode) {
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
|
||||
char file[PATH_MAX + 20];
|
||||
|
@ -204,7 +204,7 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t dndStartMnodeWorker(SDnode *pDnode) {
|
||||
static int32_t mmStartWorker(SDnode *pDnode) {
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, dndProcessMnodeQueue) != 0) {
|
||||
dError("failed to start mnode read worker since %s", terrstr());
|
||||
|
@ -224,7 +224,7 @@ static int32_t dndStartMnodeWorker(SDnode *pDnode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void dndStopMnodeWorker(SDnode *pDnode) {
|
||||
static void mmStopWorker(SDnode *pDnode) {
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
|
||||
taosWLockLatch(&pMgmt->latch);
|
||||
|
@ -240,7 +240,7 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
|
|||
dndCleanupWorker(&pMgmt->syncWorker);
|
||||
}
|
||||
|
||||
static bool dndNeedDeployMnode(SDnode *pDnode) {
|
||||
static bool mmDeployRequired(SDnode *pDnode) {
|
||||
if (dndGetDnodeId(pDnode) > 0) {
|
||||
return false;
|
||||
}
|
||||
|
@ -261,12 +261,12 @@ static int32_t dndPutMsgToMWriteQ(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t dndPutMsgToMReadQ(SDnode *pDnode, SRpcMsg* pRpcMsg) {
|
||||
static int32_t dndPutMsgToMReadQ(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pRpcMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
static void mmInitOptionImp(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
pOption->pDnode = pDnode;
|
||||
pOption->sendReqToDnodeFp = dndSendReqToDnode;
|
||||
pOption->sendReqToMnodeFp = dndSendReqToMnode;
|
||||
|
@ -277,8 +277,8 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
pOption->clusterId = dndGetClusterId(pDnode);
|
||||
}
|
||||
|
||||
static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
dndInitMnodeOption(pDnode, pOption);
|
||||
static void mmInitDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
mmInitOptionImp(pDnode, pOption);
|
||||
pOption->replica = 1;
|
||||
pOption->selfIndex = 0;
|
||||
SReplica *pReplica = &pOption->replicas[0];
|
||||
|
@ -292,16 +292,16 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
|
||||
}
|
||||
|
||||
static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
dndInitMnodeOption(pDnode, pOption);
|
||||
static void mmInitOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
mmInitOptionImp(pDnode, pOption);
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
pOption->selfIndex = pMgmt->selfIndex;
|
||||
pOption->replica = pMgmt->replica;
|
||||
memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
|
||||
}
|
||||
|
||||
static int32_t dndBuildMnodeOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
|
||||
dndInitMnodeOption(pDnode, pOption);
|
||||
static int32_t mmInitOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) {
|
||||
mmInitOptionImp(pDnode, pOption);
|
||||
pOption->dnodeId = dndGetDnodeId(pDnode);
|
||||
pOption->clusterId = dndGetClusterId(pDnode);
|
||||
|
||||
|
@ -329,7 +329,7 @@ static int32_t dndBuildMnodeOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SD
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
|
||||
SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption);
|
||||
|
@ -338,7 +338,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (dndStartMnodeWorker(pDnode) != 0) {
|
||||
if (mmStartWorker(pDnode) != 0) {
|
||||
dError("failed to start mnode worker since %s", terrstr());
|
||||
mndClose(pMnode);
|
||||
mndDestroy(pDnode->dir.mnode);
|
||||
|
@ -346,10 +346,10 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
}
|
||||
|
||||
pMgmt->deployed = 1;
|
||||
if (dndWriteMnodeFile(pDnode) != 0) {
|
||||
if (mmWriteFile(pDnode) != 0) {
|
||||
dError("failed to write mnode file since %s", terrstr());
|
||||
pMgmt->deployed = 0;
|
||||
dndStopMnodeWorker(pDnode);
|
||||
mmStopWorker(pDnode);
|
||||
mndClose(pMnode);
|
||||
mndDestroy(pDnode->dir.mnode);
|
||||
return -1;
|
||||
|
@ -365,20 +365,18 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
}
|
||||
|
||||
static void dndMnodeProcessChildQueue(SDnode *pDnode, SBlockItem *pBlock) {
|
||||
SRpcMsg *pMsg = (SRpcMsg*)pBlock->pCont;
|
||||
SRpcMsg *pMsg = (SRpcMsg *)pBlock->pCont;
|
||||
dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
|
||||
free(pBlock);
|
||||
}
|
||||
|
||||
static void dndMnodeProcessParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pItem) {
|
||||
|
||||
}
|
||||
static void dndMnodeProcessParentQueue(SMnodeMgmt *pMgmt, SBlockItem *pItem) {}
|
||||
|
||||
static int32_t dndMnodeOpen(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
static int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
pMgmt->multiProcess = true;
|
||||
|
||||
int32_t code = dndOpenMnode(pDnode, pOption);
|
||||
int32_t code = mmOpenImp(pDnode, pOption);
|
||||
|
||||
if (code == 0 && pMgmt->multiProcess) {
|
||||
SProcCfg cfg = {0};
|
||||
|
@ -394,7 +392,7 @@ static int32_t dndMnodeOpen(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
pMgmt->pProcess->pParent = pDnode;
|
||||
pMgmt->pProcess->testFlag = true;
|
||||
return taosProcStart(pMgmt->pProcess);
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -431,7 +429,7 @@ static int32_t dndDropMnode(SDnode *pDnode) {
|
|||
pMgmt->dropped = 1;
|
||||
taosRUnLockLatch(&pMgmt->latch);
|
||||
|
||||
if (dndWriteMnodeFile(pDnode) != 0) {
|
||||
if (mmWriteFile(pDnode) != 0) {
|
||||
taosRLockLatch(&pMgmt->latch);
|
||||
pMgmt->dropped = 0;
|
||||
taosRUnLockLatch(&pMgmt->latch);
|
||||
|
@ -442,9 +440,9 @@ static int32_t dndDropMnode(SDnode *pDnode) {
|
|||
}
|
||||
|
||||
dndReleaseMnode(pDnode, pMnode);
|
||||
dndStopMnodeWorker(pDnode);
|
||||
mmStopWorker(pDnode);
|
||||
pMgmt->deployed = 0;
|
||||
dndWriteMnodeFile(pDnode);
|
||||
mmWriteFile(pDnode);
|
||||
mndClose(pMnode);
|
||||
pMgmt->pMnode = NULL;
|
||||
mndDestroy(pDnode->dir.mnode);
|
||||
|
@ -452,7 +450,6 @@ static int32_t dndDropMnode(SDnode *pDnode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||
SDCreateMnodeReq createReq = {0};
|
||||
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
|
||||
|
@ -467,7 +464,7 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
SMnodeOpt option = {0};
|
||||
if (dndBuildMnodeOptionFromReq(pDnode, &option, &createReq) != 0) {
|
||||
if (mmInitOptionFromReq(pDnode, &option, &createReq) != 0) {
|
||||
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
|
||||
dError("failed to create mnode since %s", terrstr());
|
||||
return -1;
|
||||
|
@ -482,7 +479,7 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
dDebug("start to create mnode");
|
||||
return dndOpenMnode(pDnode, &option);
|
||||
return mmOpenImp(pDnode, &option);
|
||||
}
|
||||
|
||||
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||
|
@ -499,7 +496,7 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
SMnodeOpt option = {0};
|
||||
if (dndBuildMnodeOptionFromReq(pDnode, &option, &alterReq) != 0) {
|
||||
if (mmInitOptionFromReq(pDnode, &option, &alterReq) != 0) {
|
||||
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
|
||||
dError("failed to alter mnode since %s", terrstr());
|
||||
return -1;
|
||||
|
@ -612,48 +609,62 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
int32_t mmInit(SDnode *pDnode) {
|
||||
dInfo("dnode-mnode start to init");
|
||||
dInfo("mnode mgmt start to init");
|
||||
int32_t code = -1;
|
||||
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
taosInitRWLatch(&pMgmt->latch);
|
||||
|
||||
if (dndReadMnodeFile(pDnode) != 0) {
|
||||
return -1;
|
||||
if (mmReadFile(pDnode) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (pMgmt->dropped) {
|
||||
dInfo("mnode has been deployed and needs to be deleted");
|
||||
dInfo("mnode has been dropped and needs to be deleted");
|
||||
mndDestroy(pDnode->dir.mnode);
|
||||
return 0;
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (!pMgmt->deployed) {
|
||||
bool needDeploy = dndNeedDeployMnode(pDnode);
|
||||
if (!needDeploy) {
|
||||
dDebug("mnode does not need to be deployed");
|
||||
return 0;
|
||||
bool required = mmDeployRequired(pDnode);
|
||||
if (!required) {
|
||||
dInfo("mnode does not need to be deployed");
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
dInfo("start to deploy mnode");
|
||||
dInfo("mnode start to deploy");
|
||||
SMnodeOpt option = {0};
|
||||
dndBuildMnodeDeployOption(pDnode, &option);
|
||||
return dndMnodeOpen(pDnode, &option);
|
||||
mmInitDeployOption(pDnode, &option);
|
||||
code = mmOpen(pDnode, &option);
|
||||
} else {
|
||||
dInfo("start to open mnode");
|
||||
dInfo("mnode start to open");
|
||||
SMnodeOpt option = {0};
|
||||
dndBuildMnodeOpenOption(pDnode, &option);
|
||||
return dndMnodeOpen(pDnode, &option);
|
||||
mmInitOpenOption(pDnode, &option);
|
||||
code = mmOpen(pDnode, &option);
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (code == 0) {
|
||||
dInfo("mnode mgmt init success");
|
||||
} else {
|
||||
dError("failed to init mnode mgmt since %s", terrstr());
|
||||
mmCleanup(pDnode);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void mmCleanup(SDnode *pDnode) {
|
||||
dInfo("dnode-mnode start to clean up");
|
||||
dInfo("mnode mgmt start to clean up");
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
if (pMgmt->pMnode) {
|
||||
dndStopMnodeWorker(pDnode);
|
||||
mmStopWorker(pDnode);
|
||||
mndClose(pMgmt->pMnode);
|
||||
pMgmt->pMnode = NULL;
|
||||
}
|
||||
dInfo("dnode-mnode is cleaned up");
|
||||
dInfo("mnode mgmt is cleaned up");
|
||||
}
|
||||
|
||||
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
|
||||
|
@ -673,8 +684,8 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
|
||||
SMonGrantInfo *pGrantInfo) {
|
||||
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
|
||||
SMonGrantInfo *pGrantInfo) {
|
||||
SMnode *pMnode = dndAcquireMnode(pDnode);
|
||||
if (pMnode == NULL) return -1;
|
||||
|
||||
|
|
|
@ -520,7 +520,7 @@ static void dndSendMonitorReport(SDnode *pDnode) {
|
|||
SMonClusterInfo clusterInfo = {0};
|
||||
SMonVgroupInfo vgroupInfo = {0};
|
||||
SMonGrantInfo grantInfo = {0};
|
||||
if (dndGetMnodeMonitorInfo(pDnode, &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
|
||||
if (mmGetMonitorInfo(pDnode, &clusterInfo, &vgroupInfo, &grantInfo) == 0) {
|
||||
monSetClusterInfo(pMonitor, &clusterInfo);
|
||||
monSetVgroupInfo(pMonitor, &vgroupInfo);
|
||||
monSetGrantInfo(pMonitor, &grantInfo);
|
||||
|
|
|
@ -126,20 +126,20 @@ static int32_t mndInitTimer(SMnode *pMnode) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
// if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
// if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
if (taosTmrReset(mndPullupTelem, 60000, pMnode, pMnode->timer, &pMnode->telemTimer)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
// if (taosTmrReset(mndPullupTelem, 60000, pMnode, pMnode->timer, &pMnode->telemTimer)) {
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue