This commit is contained in:
Shengliang Guan 2022-03-30 15:57:12 +08:00
parent f16e9970fd
commit 5bd37b75dc
7 changed files with 40 additions and 49 deletions

View File

@ -130,7 +130,7 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) {
} }
code = 0; code = 0;
dInfo("succcessed to read file %s", file); dDebug("succcessed to read file %s", file);
dmPrintDnodes(pMgmt); dmPrintDnodes(pMgmt);
PRASE_DNODE_OVER: PRASE_DNODE_OVER:

View File

@ -112,6 +112,16 @@ int32_t dmInit(SMgmtWrapper *pWrapper) {
return -1; return -1;
} }
if (dndInitServer(pDnode) != 0) {
dError("failed to init trans server since %s", terrstr());
return -1;
}
if (dndInitClient(pDnode) != 0) {
dError("failed to init trans client since %s", terrstr());
return -1;
}
pWrapper->pMgmt = pMgmt; pWrapper->pMgmt = pMgmt;
dInfo("dnode-mgmt is initialized"); dInfo("dnode-mgmt is initialized");
return 0; return 0;
@ -122,6 +132,7 @@ void dmCleanup(SMgmtWrapper *pWrapper) {
if (pMgmt == NULL) return; if (pMgmt == NULL) return;
dInfo("dnode-mgmt start to clean up"); dInfo("dnode-mgmt start to clean up");
SDnode *pDnode = pMgmt->pDnode;
dmStopWorker(pMgmt); dmStopWorker(pMgmt);
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
@ -140,6 +151,9 @@ void dmCleanup(SMgmtWrapper *pWrapper) {
taosMemoryFree(pMgmt); taosMemoryFree(pMgmt);
pWrapper->pMgmt = NULL; pWrapper->pMgmt = NULL;
dndCleanupServer(pDnode);
dndCleanupClient(pDnode);
dInfo("dnode-mgmt is cleaned up"); dInfo("dnode-mgmt is cleaned up");
} }

View File

@ -141,13 +141,17 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp no
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndSendMonitorReport(SDnode *pDnode); void dndSendMonitorReport(SDnode *pDnode);
int32_t dndInitServer(SDnode *pDnode);
void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed); int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);

View File

@ -50,10 +50,6 @@ void dndClose(SDnode *pDnode);
void dndHandleEvent(SDnode *pDnode, EDndEvent event); void dndHandleEvent(SDnode *pDnode, EDndEvent event);
// dndTransport.c // dndTransport.c
int32_t dndInitServer(SDnode *pDnode);
void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);

View File

@ -16,15 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
static void dndResetLog(SMgmtWrapper *pMgmt) {
char logname[24] = {0};
snprintf(logname, sizeof(logname), "%slog", pMgmt->name);
dInfo("node:%s, reset log to %s in child process", pMgmt->name, logname);
taosCloseLog();
taosInitLog(logname, 1);
}
static bool dndRequireNode(SMgmtWrapper *pWrapper) { static bool dndRequireNode(SMgmtWrapper *pWrapper) {
bool required = false; bool required = false;
int32_t code =(*pWrapper->fp.requiredFp)(pWrapper, &required); int32_t code =(*pWrapper->fp.requiredFp)(pWrapper, &required);
@ -71,22 +62,21 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
} }
static int32_t dndRunInSingleProcess(SDnode *pDnode) { static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process mode"); dDebug("dnode run in single process mode");
SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]);
tmsgSetDefaultMsgCb(&msgCb);
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper); pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
if (taosMkDir(pWrapper->path) != 0) { if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
return -1; return -1;
} }
dInfo("node:%s, will start in single process", pWrapper->name);
pWrapper->procType = PROC_SINGLE; pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pWrapper) != 0) { if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
@ -215,7 +205,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
if (taosProcIsChild(pProc)) { if (taosProcIsChild(pProc)) {
dInfo("node:%s, will start in child process", pWrapper->name); dInfo("node:%s, will start in child process", pWrapper->name);
pWrapper->procType = PROC_CHILD; pWrapper->procType = PROC_CHILD;
dndResetLog(pWrapper); // dndResetLog(pWrapper);
dInfo("node:%s, clean up resources inherited from parent", pWrapper->name); dInfo("node:%s, clean up resources inherited from parent", pWrapper->name);
dndClearNodesExecpt(pDnode, n); dndClearNodesExecpt(pDnode, n);

View File

@ -41,7 +41,7 @@ int32_t dndInit() {
return -1; return -1;
} }
dDebug("dnode env is initialized"); dInfo("dnode env is initialized");
return 0; return 0;
} }
@ -55,7 +55,7 @@ void dndCleanup() {
monCleanup(); monCleanup();
walCleanUp(); walCleanUp();
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
dDebug("dnode env is cleaned up"); dInfo("dnode env is cleaned up");
} }
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {

View File

@ -53,7 +53,7 @@ static void dndClearVars(SDnode *pDnode) {
} }
SDnode *dndCreate(const SDnodeOpt *pOption) { SDnode *dndCreate(const SDnodeOpt *pOption) {
dInfo("start to create dnode object"); dDebug("start to create dnode object");
int32_t code = -1; int32_t code = -1;
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
SDnode *pDnode = NULL; SDnode *pDnode = NULL;
@ -77,25 +77,6 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
smGetMgmtFp(&pDnode->wrappers[SNODE]); smGetMgmtFp(&pDnode->wrappers[SNODE]);
bmGetMgmtFp(&pDnode->wrappers[BNODE]); bmGetMgmtFp(&pDnode->wrappers[BNODE]);
if (dndOpenRuntimeFile(pDnode) != 0) {
dError("failed to open runtime file since %s", terrstr());
goto _OVER;
}
if (dndInitServer(pDnode) != 0) {
dError("failed to init trans server since %s", terrstr());
goto _OVER;
}
if (dndInitClient(pDnode) != 0) {
dError("failed to init trans client since %s", terrstr());
goto _OVER;
}
if (dndInitMsgHandle(pDnode) != 0) {
goto _OVER;
}
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
@ -110,6 +91,17 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
taosInitRWLatch(&pWrapper->latch); taosInitRWLatch(&pWrapper->latch);
} }
if (dndInitMsgHandle(pDnode) != 0) {
dError("failed to msg handles since %s", terrstr());
goto _OVER;
}
if (dndOpenRuntimeFile(pDnode) != 0) {
dError("failed to open runtime file since %s", terrstr());
goto _OVER;
}
dInfo("dnode object is created, data:%p", pDnode);
code = 0; code = 0;
_OVER: _OVER:
@ -117,8 +109,6 @@ _OVER:
dndClearVars(pDnode); dndClearVars(pDnode);
pDnode = NULL; pDnode = NULL;
dError("failed to create dnode object since %s", terrstr()); dError("failed to create dnode object since %s", terrstr());
} else {
dInfo("dnode object is created, data:%p", pDnode);
} }
return pDnode; return pDnode;
@ -135,9 +125,6 @@ void dndClose(SDnode *pDnode) {
dInfo("start to close dnode, data:%p", pDnode); dInfo("start to close dnode, data:%p", pDnode);
dndSetStatus(pDnode, DND_STAT_STOPPED); dndSetStatus(pDnode, DND_STAT_STOPPED);
dndCleanupServer(pDnode);
dndCleanupClient(pDnode);
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
dndCloseNode(pWrapper); dndCloseNode(pWrapper);