TD-11265 refact dndMnode

This commit is contained in:
Shengliang Guan 2021-11-26 16:28:38 +08:00
parent 7906e78257
commit aad478d34f
7 changed files with 91 additions and 78 deletions

View File

@ -121,10 +121,10 @@ typedef struct {
/** /**
* @brief Initialize and start the dnode. * @brief Initialize and start the dnode.
* *
* @param pOptions Options of the dnode. * @param pOption Option of the dnode.
* @return SDnode* The dnode object. * @return SDnode* The dnode object.
*/ */
SDnode *dndInit(SDnodeOpt *pOptions); SDnode *dndInit(SDnodeOpt *pOption);
/** /**
* @brief Stop and cleanup the dnode. * @brief Stop and cleanup the dnode.

View File

@ -53,17 +53,17 @@ typedef struct {
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
} SMnodeOptions; } SMnodeOpt;
/* ------------------------ SMnode ------------------------ */ /* ------------------------ SMnode ------------------------ */
/** /**
* @brief Open a mnode. * @brief Open a mnode.
* *
* @param path Path of the mnode * @param path Path of the mnode
* @param pOptions Options of the mnode * @param pOption Option of the mnode
* @return SMnode* The mnode object * @return SMnode* The mnode object
*/ */
SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions); SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption);
/** /**
* @brief Close a mnode * @brief Close a mnode
@ -76,10 +76,10 @@ void mnodeClose(SMnode *pMnode);
* @brief Close a mnode * @brief Close a mnode
* *
* @param pMnode The mnode object to close * @param pMnode The mnode object to close
* @param pOptions Options of the mnode * @param pOption Options of the mnode
* @return int32_t 0 for success, -1 for failure * @return int32_t 0 for success, -1 for failure
*/ */
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions); int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption);
/** /**
* @brief Drop a mnode. * @brief Drop a mnode.

View File

@ -58,8 +58,8 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode);
static int32_t dndReadMnodeFile(SDnode *pDnode); static int32_t dndReadMnodeFile(SDnode *pDnode);
static int32_t dndWriteMnodeFile(SDnode *pDnode); static int32_t dndWriteMnodeFile(SDnode *pDnode);
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions); static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions); static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption);
static int32_t dndDropMnode(SDnode *pDnode); static int32_t dndDropMnode(SDnode *pDnode);
static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
@ -243,6 +243,7 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
if (dndGetClusterId(pDnode) > 0) { if (dndGetClusterId(pDnode) > 0) {
return false; return false;
} }
if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) { if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) {
return false; return false;
} }
@ -250,43 +251,49 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
return true; return true;
} }
static void dndInitMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions) { static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOptions->pDnode = pDnode; pOption->pDnode = pDnode;
pOptions->sendMsgToDnodeFp = dndSendMsgToDnode; pOption->sendMsgToDnodeFp = dndSendMsgToDnode;
pOptions->sendMsgToMnodeFp = dndSendMsgToMnode; pOption->sendMsgToMnodeFp = dndSendMsgToMnode;
pOptions->sendRedirectMsgFp = dndSendRedirectMsg; pOption->sendRedirectMsgFp = dndSendRedirectMsg;
pOptions->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue;
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
} }
static int32_t dndBuildMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions, SCreateMnodeMsg *pMsg) { static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
dndInitMnodeOptions(pDnode, pOptions); dndInitMnodeOption(pDnode, pOption);
pOption->replica = 1;
if (pMsg == NULL) { pOption->selfIndex = 0;
pOptions->dnodeId = 1; SReplica *pReplica = &pOption->replicas[0];
pOptions->clusterId = 1234;
pOptions->replica = 1;
pOptions->selfIndex = 0;
SReplica *pReplica = &pOptions->replicas[0];
pReplica->id = 1; pReplica->id = 1;
pReplica->port = pDnode->opt.serverPort; pReplica->port = pDnode->opt.serverPort;
tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
} else { }
pOptions->dnodeId = dndGetDnodeId(pDnode);
pOptions->clusterId = dndGetClusterId(pDnode); static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOptions->selfIndex = -1; dndInitMnodeOption(pDnode, pOption);
pOptions->replica = pMsg->replica; pOption->replica = 0;
}
static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeMsg *pMsg) {
dndInitMnodeOption(pDnode, pOption);
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
pOption->replica = pMsg->replica;
pOption->selfIndex = -1;
for (int32_t index = 0; index < pMsg->replica; ++index) { for (int32_t index = 0; index < pMsg->replica; ++index) {
SReplica *pReplica = &pOptions->replicas[index]; SReplica *pReplica = &pOption->replicas[index];
pReplica->id = pMsg->replicas[index].id; pReplica->id = pMsg->replicas[index].id;
pReplica->port = pMsg->replicas[index].port; pReplica->port = pMsg->replicas[index].port;
tstrncpy(pReplica->fqdn, pMsg->replicas[index].fqdn, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, pMsg->replicas[index].fqdn, TSDB_FQDN_LEN);
if (pReplica->id == pOptions->dnodeId) { if (pReplica->id == pOption->dnodeId) {
pOptions->selfIndex = index; pOption->selfIndex = index;
}
} }
} }
if (pOptions->selfIndex == -1) { if (pOption->selfIndex == -1) {
terrno = TSDB_CODE_DND_MNODE_ID_NOT_FOUND; terrno = TSDB_CODE_DND_MNODE_ID_NOT_FOUND;
dError("failed to build mnode options since %s", terrstr()); dError("failed to build mnode options since %s", terrstr());
return -1; return -1;
@ -295,7 +302,7 @@ static int32_t dndBuildMnodeOptions(SDnode *pDnode, SMnodeOptions *pOptions, SCr
return 0; return 0;
} }
static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) { static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = dndStartMnodeWorker(pDnode); int32_t code = dndStartMnodeWorker(pDnode);
@ -304,7 +311,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
return code; return code;
} }
SMnode *pMnode = mnodeOpen(pDnode->dir.mnode, pOptions); SMnode *pMnode = mnodeOpen(pDnode->dir.mnode, pOption);
if (pMnode == NULL) { if (pMnode == NULL) {
dError("failed to open mnode since %s", terrstr()); dError("failed to open mnode since %s", terrstr());
code = terrno; code = terrno;
@ -331,7 +338,7 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
return 0; return 0;
} }
static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) { static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode); SMnode *pMnode = dndAcquireMnode(pDnode);
@ -340,7 +347,7 @@ static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOptions *pOptions) {
return -1; return -1;
} }
if (mnodeAlter(pMnode, pOptions) != 0) { if (mnodeAlter(pMnode, pOption) != 0) {
dError("failed to alter mnode since %s", terrstr()); dError("failed to alter mnode since %s", terrstr());
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
return -1; return -1;
@ -399,8 +406,8 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID; terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
return -1; return -1;
} else { } else {
SMnodeOptions option = {0}; SMnodeOpt option = {0};
if (dndBuildMnodeOptions(pDnode, &option, pMsg) != 0) { if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1; return -1;
} }
return dndOpenMnode(pDnode, &option); return dndOpenMnode(pDnode, &option);
@ -414,8 +421,8 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
terrno = TSDB_CODE_DND_MNODE_ID_INVALID; terrno = TSDB_CODE_DND_MNODE_ID_INVALID;
return -1; return -1;
} else { } else {
SMnodeOptions option = {0}; SMnodeOpt option = {0};
if (dndBuildMnodeOptions(pDnode, &option, pMsg) != 0) { if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) {
return -1; return -1;
} }
return dndAlterMnode(pDnode, &option); return dndAlterMnode(pDnode, &option);
@ -625,7 +632,6 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) {
static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) { static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
;
tWorkerCleanup(&pMgmt->mgmtPool); tWorkerCleanup(&pMgmt->mgmtPool);
} }
@ -737,7 +743,12 @@ static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) {
pPool->name = "mnode-sync"; pPool->name = "mnode-sync";
pPool->min = 0; pPool->min = 0;
pPool->max = 1; pPool->max = 1;
return tWorkerInit(pPool); if (tWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
} }
static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { static void dndCleanupMnodeSyncWorker(SDnode *pDnode) {
@ -781,13 +792,15 @@ int32_t dndInitMnode(SDnode *pDnode) {
} }
dInfo("start to deploy mnode"); dInfo("start to deploy mnode");
SMnodeOpt option = {0};
dndBuildMnodeDeployOption(pDnode, &option);
return dndOpenMnode(pDnode, &option);
} else { } else {
dInfo("start to open mnode"); dInfo("start to open mnode");
} SMnodeOpt option = {0};
dndBuildMnodeOpenOption(pDnode, &option);
SMnodeOptions option = {0};
dndInitMnodeOptions(pDnode, &option);
return dndOpenMnode(pDnode, &option); return dndOpenMnode(pDnode, &option);
}
} }
void dndCleanupMnode(SDnode *pDnode) { void dndCleanupMnode(SDnode *pDnode) {

View File

@ -239,7 +239,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_
snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes); snprintf(file, PATH_MAX + 20, "%s/vnodes.json", pDnode->dir.vnodes);
fp = fopen(file, "r"); fp = fopen(file, "r");
if (!fp) { if (fp == NULL) {
dDebug("file %s not exist", file); dDebug("file %s not exist", file);
code = 0; code = 0;
goto PRASE_VNODE_OVER; goto PRASE_VNODE_OVER;

View File

@ -77,19 +77,19 @@ static int32_t dndCheckRunning(char *dataDir) {
return 0; return 0;
} }
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) { static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) {
if (dndCheckRunning(pOptions->dataDir) != 0) { if (dndCheckRunning(pOption->dataDir) != 0) {
return -1; return -1;
} }
char path[PATH_MAX + 100]; char path[PATH_MAX + 100];
snprintf(path, sizeof(path), "%s%smnode", pOptions->dataDir, TD_DIRSEP); snprintf(path, sizeof(path), "%s%smnode", pOption->dataDir, TD_DIRSEP);
pDnode->dir.mnode = tstrdup(path); pDnode->dir.mnode = tstrdup(path);
snprintf(path, sizeof(path), "%s%svnode", pOptions->dataDir, TD_DIRSEP); snprintf(path, sizeof(path), "%s%svnode", pOption->dataDir, TD_DIRSEP);
pDnode->dir.vnodes = tstrdup(path); pDnode->dir.vnodes = tstrdup(path);
snprintf(path, sizeof(path), "%s%sdnode", pOptions->dataDir, TD_DIRSEP); snprintf(path, sizeof(path), "%s%sdnode", pOption->dataDir, TD_DIRSEP);
pDnode->dir.dnode = tstrdup(path); pDnode->dir.dnode = tstrdup(path);
if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) { if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) {
@ -116,7 +116,7 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) {
return -1; return -1;
} }
memcpy(&pDnode->opt, pOptions, sizeof(SDnodeOpt)); memcpy(&pDnode->opt, pOption, sizeof(SDnodeOpt));
return 0; return 0;
} }
@ -136,7 +136,7 @@ static void dndCleanupEnv(SDnode *pDnode) {
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
} }
SDnode *dndInit(SDnodeOpt *pOptions) { SDnode *dndInit(SDnodeOpt *pOption) {
taosIgnSIGPIPE(); taosIgnSIGPIPE();
taosBlockSIGPIPE(); taosBlockSIGPIPE();
taosResolveCRC(); taosResolveCRC();
@ -151,7 +151,7 @@ SDnode *dndInit(SDnodeOpt *pOptions) {
dInfo("start to initialize TDengine"); dInfo("start to initialize TDengine");
dndSetStat(pDnode, DND_STAT_INIT); dndSetStat(pDnode, DND_STAT_INIT);
if (dndInitEnv(pDnode, pOptions) != 0) { if (dndInitEnv(pDnode, pOption) != 0) {
dError("failed to init env"); dError("failed to init env");
dndCleanup(pDnode); dndCleanup(pDnode);
return NULL; return NULL;

View File

@ -32,7 +32,7 @@ typedef struct SMnodeBak {
tmr_h timer; tmr_h timer;
SSteps *pInitSteps; SSteps *pInitSteps;
SSteps *pStartSteps; SSteps *pStartSteps;
SMnodeOptions para; SMnodeOpt para;
MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX]; MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX];
} SMnodeBak; } SMnodeBak;

View File

@ -77,17 +77,17 @@ static void mnodeCleanupTimer() {
tmr_h mnodeGetTimer() { return tsMint.timer; } tmr_h mnodeGetTimer() { return tsMint.timer; }
static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOptions *pOptions) { static int32_t mnodeSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->dnodeId = pOptions->dnodeId; pMnode->dnodeId = pOption->dnodeId;
pMnode->clusterId = pOptions->clusterId; pMnode->clusterId = pOption->clusterId;
pMnode->replica = pOptions->replica; pMnode->replica = pOption->replica;
pMnode->selfIndex = pOptions->selfIndex; pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOptions->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pServer = pOptions->pDnode; pMnode->pServer = pOption->pDnode;
pMnode->putMsgToApplyMsgFp = pOptions->putMsgToApplyMsgFp; pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOptions->sendMsgToDnodeFp; pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOptions->sendMsgToMnodeFp; pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOptions->sendRedirectMsgFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) { pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
@ -136,10 +136,10 @@ static int32_t mnodeAllocStartSteps() {
return 0; return 0;
} }
SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) { SMnode *mnodeOpen(const char *path, const SMnodeOpt *pOption) {
SMnode *pMnode = calloc(1, sizeof(SMnode)); SMnode *pMnode = calloc(1, sizeof(SMnode));
if (mnodeSetOptions(pMnode, pOptions) != 0) { if (mnodeSetOptions(pMnode, pOption) != 0) {
free(pMnode); free(pMnode);
mError("failed to init mnode options since %s", terrstr()); mError("failed to init mnode options since %s", terrstr());
return NULL; return NULL;
@ -173,7 +173,7 @@ SMnode *mnodeOpen(const char *path, const SMnodeOptions *pOptions) {
void mnodeClose(SMnode *pMnode) { free(pMnode); } void mnodeClose(SMnode *pMnode) { free(pMnode); }
int32_t mnodeAlter(SMnode *pMnode, const SMnodeOptions *pOptions) { return 0; } int32_t mnodeAlter(SMnode *pMnode, const SMnodeOpt *pOption) { return 0; }
void mnodeDestroy(const char *path) { sdbUnDeploy(); } void mnodeDestroy(const char *path) { sdbUnDeploy(); }