refact(cluster): node mgmt
This commit is contained in:
parent
e770a6e974
commit
1dd4984eeb
|
@ -54,7 +54,7 @@ int32_t dmReadEps(SDnode *pDnode) {
|
|||
goto PRASE_DNODE_OVER;
|
||||
}
|
||||
|
||||
snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->data.path, TD_DIRSEP);
|
||||
snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->data.dataDir, TD_DIRSEP);
|
||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
// dDebug("file %s not exist", file);
|
||||
|
@ -176,7 +176,7 @@ PRASE_DNODE_OVER:
|
|||
|
||||
int32_t dmWriteEps(SDnode *pDnode) {
|
||||
char file[PATH_MAX];
|
||||
snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->data.path, TD_DIRSEP);
|
||||
snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->data.dataDir, TD_DIRSEP);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
|
@ -216,7 +216,7 @@ int32_t dmWriteEps(SDnode *pDnode) {
|
|||
taosMemoryFree(content);
|
||||
|
||||
char realfile[PATH_MAX];
|
||||
snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->data.path, TD_DIRSEP);
|
||||
snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->data.dataDir, TD_DIRSEP);
|
||||
|
||||
if (taosRenameFile(file, realfile) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
|
|
@ -130,6 +130,10 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
|||
|
||||
static void dmCloseNodeImp(SMgmtWrapper *pWrapper) {
|
||||
dDebug("node:%s, mgmt start to close", pWrapper->name);
|
||||
if (pWrapper->fp.stopFp != NULL) {
|
||||
(*pWrapper->fp.stopFp)(pWrapper);
|
||||
}
|
||||
|
||||
pWrapper->required = false;
|
||||
taosWLockLatch(&pWrapper->latch);
|
||||
if (pWrapper->deployed) {
|
||||
|
@ -185,7 +189,7 @@ static int32_t dmRunInSingleProcess(SDnode *pDnode) {
|
|||
|
||||
dmSetStatus(pDnode, DND_STAT_RUNNING);
|
||||
|
||||
for (EDndNodeType n = 0; n < NODE_END; ++n) {
|
||||
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||
if (!pWrapper->required) continue;
|
||||
if (pWrapper->fp.startFp == NULL) continue;
|
||||
|
|
|
@ -53,7 +53,7 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
|
|||
}
|
||||
|
||||
static void dmClearVars(SDnode *pDnode) {
|
||||
for (EDndNodeType n = 0; n < NODE_END; ++n) {
|
||||
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
|
||||
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
|
||||
taosMemoryFreeClear(pMgmt->path);
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ _OVER:
|
|||
void dmClose(SDnode *pDnode) {
|
||||
if (pDnode == NULL) return;
|
||||
|
||||
for (EDndNodeType n = 0; n < NODE_END; ++n) {
|
||||
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||
dmCloseNode(pWrapper);
|
||||
}
|
||||
|
|
|
@ -154,6 +154,7 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
rpcSendResponse(&rspMsg);
|
||||
}
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pHandle->pMndWrapper != NULL || pHandle->pQndWrapper != NULL) {
|
||||
|
@ -174,7 +175,7 @@ static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||
SDnodeTrans *pTrans = &pDnode->trans;
|
||||
|
||||
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
|
||||
for (EDndNodeType n = DNODE; n < NODE_END; ++n) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||
|
||||
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
|
||||
|
|
|
@ -119,7 +119,6 @@ typedef struct {
|
|||
SSingleWorker mgmtWorker;
|
||||
SMsgCb msgCb;
|
||||
SDnode *pDnode;
|
||||
const char *path;
|
||||
TdFilePtr lockfile;
|
||||
char *localEp;
|
||||
char *localFqdn;
|
||||
|
|
Loading…
Reference in New Issue