This commit is contained in:
Shengliang Guan 2022-03-17 16:58:01 +08:00
parent c018ea8040
commit 5f0069e3d3
7 changed files with 105 additions and 105 deletions

View File

@ -53,7 +53,7 @@ static bool bmRequire(SMgmtWrapper *pWrapper) {
if (mgmt.dropped) { if (mgmt.dropped) {
dInfo("bnode has been dropped and needs to be deleted"); dInfo("bnode has been dropped and needs to be deleted");
mndDestroy(mgmt.path); taosRemoveDir(mgmt.path);
return false; return false;
} }
@ -66,7 +66,6 @@ static bool bmRequire(SMgmtWrapper *pWrapper) {
void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper; pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode; pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode; pOption->sendMnodeReqFp = dndSendReqToMnode;

View File

@ -162,8 +162,7 @@ int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, cons
void dndCleanupWorker(SDnodeWorker *pWorker); void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
int32_t dndProcessCreateNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndProcessDropNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -41,8 +41,8 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
// dndExec.c // dndExec.c
int32_t dndOpenNode(SMgmtWrapper *pWrapper); int32_t dndOpenNode(SDnode *pDnode, ENodeType nodeType);
void dndCloseNode(SMgmtWrapper *pWrapper); int32_t dndCloseNode(SDnode *pDnode, ENodeType nodeType);
int32_t dndRun(SDnode *pDnode); int32_t dndRun(SDnode *pDnode);
// dndObj.c // dndObj.c
@ -51,6 +51,7 @@ void dndClose(SDnode *pDnode);
void dndHandleEvent(SDnode *pDnode, EDndEvent event); void dndHandleEvent(SDnode *pDnode, EDndEvent event);
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
void dndReleaseWrapper(SMgmtWrapper *pWrapper); void dndReleaseWrapper(SMgmtWrapper *pWrapper);
// dndTransport.c // dndTransport.c

View File

@ -35,37 +35,51 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) {
return required; return required;
} }
int32_t dndOpenNode(SMgmtWrapper *pWrapper) { int32_t dndOpenNode(SDnode *pDnode, ENodeType ntype) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) {
dndReleaseWrapper(pWrapper);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
return -1;
}
pWrapper = &pDnode->wrappers[ntype];
int32_t code = (*pWrapper->fp.openFp)(pWrapper); int32_t code = (*pWrapper->fp.openFp)(pWrapper);
if (code != 0) { if (code != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
} else { } else {
dDebug("node:%s, has been opened", pWrapper->name); dDebug("node:%s, has been opened", pWrapper->name);
pWrapper->deployed = true;
} }
pWrapper->deployed = true; return code;
return 0;
} }
void dndCloseNode(SMgmtWrapper *pWrapper) { int32_t dndCloseNode(SDnode *pDnode, ENodeType ntype) {
taosWLockLatch(&pWrapper->latch); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper->deployed) { if (pWrapper == NULL) {
(*pWrapper->fp.closeFp)(pWrapper); terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
pWrapper->deployed = false; return -1;
} }
taosWLockLatch(&pWrapper->latch);
(*pWrapper->fp.closeFp)(pWrapper);
pWrapper->deployed = false;
if (pWrapper->pProc) { if (pWrapper->pProc) {
taosProcCleanup(pWrapper->pProc); taosProcCleanup(pWrapper->pProc);
pWrapper->pProc = NULL; pWrapper->pProc = NULL;
} }
taosWUnLockLatch(&pWrapper->latch); taosWUnLockLatch(&pWrapper->latch);
dndReleaseWrapper(pWrapper);
return 0;
} }
static int32_t dndRunInSingleProcess(SDnode *pDnode) { static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process mode"); dInfo("dnode run in single process mode");
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
pWrapper->required = dndRequireNode(pWrapper); pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
@ -77,14 +91,14 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("node:%s, will start in single process", pWrapper->name); dInfo("node:%s, will start in single process", pWrapper->name);
pWrapper->procType = PROC_SINGLE; pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pWrapper) != 0) { if (dndOpenNode(pDnode, ntype) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1; return -1;
} }
} }
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
int32_t code = dmStart(pWrapper->pMgmt); int32_t code = dmStart(pWrapper->pMgmt);
if (code != 0) { if (code != 0) {
dError("failed to start dnode worker since %s", terrstr()); dError("failed to start dnode worker since %s", terrstr());
} }
@ -95,10 +109,9 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
dndCleanupServer(pDnode); dndCleanupServer(pDnode);
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) {
if (except == n) continue; if (except == ntype) continue;
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; (void)dndCloseNode(pDnode, ntype);
dndCloseNode(pWrapper);
} }
} }
@ -111,8 +124,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t
int32_t code = (*msgFp)(pWrapper, pMsg); int32_t code = (*msgFp)(pWrapper, pMsg);
if (code != 0) { if (code != 0) {
bool isReq = (pRpc->msgType & 1U); if (pRpc->msgType & 1U) {
if (isReq) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
dndSendRsp(pWrapper, &rsp); dndSendRsp(pWrapper, &rsp);
} }
@ -133,8 +145,8 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t
static int32_t dndRunInMultiProcess(SDnode *pDnode) { static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dInfo("dnode run in multi process mode"); dInfo("dnode run in multi process mode");
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
pWrapper->required = dndRequireNode(pWrapper); pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
@ -144,10 +156,10 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
return -1; return -1;
} }
if (n == DNODE) { if (ntype == DNODE) {
dInfo("node:%s, will start in parent process", pWrapper->name); dInfo("node:%s, will start in parent process", pWrapper->name);
pWrapper->procType = PROC_SINGLE; pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pWrapper) != 0) { if (dndOpenNode(pDnode, ntype) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1; return -1;
} }
@ -183,10 +195,10 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dndResetLog(pWrapper); dndResetLog(pWrapper);
dInfo("node:%s, clean up resources inherited from parent", pWrapper->name); dInfo("node:%s, clean up resources inherited from parent", pWrapper->name);
dndClearNodesExecpt(pDnode, n); dndClearNodesExecpt(pDnode, ntype);
dInfo("node:%s, will be initialized in child process", pWrapper->name); dInfo("node:%s, will be initialized in child process", pWrapper->name);
dndOpenNode(pWrapper); dndOpenNode(pDnode, ntype);
} else { } else {
dInfo("node:%s, will not start in parent process", pWrapper->name); dInfo("node:%s, will not start in parent process", pWrapper->name);
pWrapper->procType = PROC_PARENT; pWrapper->procType = PROC_PARENT;

View File

@ -16,7 +16,24 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) {
dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet);
}
dndReleaseWrapper(pWrapper);
}
static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
return msgFp;
}
static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0}; SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
@ -25,47 +42,34 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
} }
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->rpcMsg = *pRpc; memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
return 0; return 0;
} }
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dmUpdateMnodeEpSet(dndAcquireWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet); dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
} }
int32_t code = -1; int32_t code = -1;
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
NodeMsgFp msgFp = NULL;
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
if (msgFp == NULL) { if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
terrno = TSDB_CODE_MSG_NOT_PROCESSED; if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
goto _OVER; if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
}
pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
goto _OVER;
}
if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) {
goto _OVER;
}
dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user); dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
if (pWrapper->procType == PROC_SINGLE) { if (pWrapper->procType == PROC_SINGLE) {
code = (*msgFp)(pWrapper->pMgmt, pMsg); code = (*msgFp)(pWrapper->pMgmt, pMsg);
} else if (pWrapper->procType == PROC_PARENT) { } else if (pWrapper->procType == PROC_PARENT) {
code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
} else { } else {
terrno = TSDB_CODE_MEMORY_CORRUPTED;
dError("msg:%p, won't be processed for it is child process", pMsg);
} }
_OVER: _OVER:
if (code == 0) { if (code == 0) {
if (pWrapper->procType == PROC_PARENT) { if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
@ -74,8 +78,7 @@ _OVER:
} }
} else { } else {
dError("msg:%p, failed to process since %s", pMsg, terrstr()); dError("msg:%p, failed to process since %s", pMsg, terrstr());
bool isReq = (pRpc->msgType & 1U); if (pRpc->msgType & 1U) {
if (isReq) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
dndSendRsp(pWrapper, &rsp); dndSendRsp(pWrapper, &rsp);
} }
@ -83,57 +86,29 @@ _OVER:
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont); rpcFreeCont(pRpc->pCont);
} }
dndReleaseWrapper(pWrapper);
} }
static SMgmtWrapper *dndGetWrapperFromMsg(SDnode *pDnode, SNodeMsg *pMsg) { int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = NULL;
switch (pMsg->rpcMsg.msgType) { switch (pMsg->rpcMsg.msgType) {
case TDMT_DND_CREATE_MNODE: case TDMT_DND_CREATE_MNODE:
return dndAcquireWrapper(pDnode, MNODE); return dndOpenNode(pDnode, MNODE);
case TDMT_DND_CREATE_QNODE:
return dndAcquireWrapper(pDnode, QNODE);
case TDMT_DND_CREATE_SNODE:
return dndAcquireWrapper(pDnode, SNODE);
case TDMT_DND_CREATE_BNODE:
return dndAcquireWrapper(pDnode, BNODE);
default:
return NULL;
}
}
int32_t dndProcessCreateNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndGetWrapperFromMsg(pDnode, pMsg);
if (pWrapper->procType == PROC_SINGLE) {
switch (pMsg->rpcMsg.msgType) {
case TDMT_DND_CREATE_MNODE:
return mmProcessCreateReq(pWrapper->pMgmt, pMsg);
case TDMT_DND_CREATE_QNODE:
return qmProcessCreateReq(pWrapper->pMgmt, pMsg);
case TDMT_DND_CREATE_SNODE:
return smProcessCreateReq(pWrapper->pMgmt, pMsg);
case TDMT_DND_CREATE_BNODE:
return bmProcessCreateReq(pWrapper->pMgmt, pMsg);
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}
}
int32_t dndProcessDropNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndGetWrapperFromMsg(pDnode, pMsg);
switch (pMsg->rpcMsg.msgType) {
case TDMT_DND_DROP_MNODE: case TDMT_DND_DROP_MNODE:
return mmProcessDropReq(pWrapper->pMgmt, pMsg); return dndCloseNode(pDnode, MNODE);
case TDMT_DND_CREATE_QNODE:
return dndOpenNode(pDnode, QNODE);
case TDMT_DND_DROP_QNODE: case TDMT_DND_DROP_QNODE:
return qmProcessDropReq(pWrapper->pMgmt, pMsg); return dndCloseNode(pDnode, QNODE);
case TDMT_DND_CREATE_SNODE:
return dndOpenNode(pDnode, SNODE);
case TDMT_DND_DROP_SNODE: case TDMT_DND_DROP_SNODE:
return smProcessDropReq(pWrapper->pMgmt, pMsg); return dndCloseNode(pDnode, MNODE);
case TDMT_DND_CREATE_BNODE:
return dndOpenNode(pDnode, BNODE);
case TDMT_DND_DROP_BNODE: case TDMT_DND_DROP_BNODE:
return bmProcessDropReq(pWrapper->pMgmt, pMsg); return dndCloseNode(pDnode, BNODE);
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1; return -1;

View File

@ -124,9 +124,8 @@ void dndClose(SDnode *pDnode) {
dndCleanupServer(pDnode); dndCleanupServer(pDnode);
dndCleanupClient(pDnode); dndCleanupClient(pDnode);
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType ntype = 0; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; (void)dndCloseNode(pDnode, ntype);
dndCloseNode(pWrapper);
} }
dndClearMemory(pDnode); dndClearMemory(pDnode);
@ -138,8 +137,8 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
pDnode->event = event; pDnode->event = event;
} }
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType) { SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
SMgmtWrapper *pRetWrapper = pWrapper; SMgmtWrapper *pRetWrapper = pWrapper;
taosRLockLatch(&pWrapper->latch); taosRLockLatch(&pWrapper->latch);
@ -155,6 +154,22 @@ SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType) {
return pRetWrapper; return pRetWrapper;
} }
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
code = -1;
}
taosRUnLockLatch(&pWrapper->latch);
return code;
}
void dndReleaseWrapper(SMgmtWrapper *pWrapper) { void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
if (pWrapper == NULL) return; if (pWrapper == NULL) return;

View File

@ -63,13 +63,12 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
case TDMT_DND_CREATE_QNODE: case TDMT_DND_CREATE_QNODE:
case TDMT_DND_CREATE_SNODE: case TDMT_DND_CREATE_SNODE:
case TDMT_DND_CREATE_BNODE: case TDMT_DND_CREATE_BNODE:
code = dndProcessCreateNodeMsg(pMgmt->pDnode, pMsg);
case TDMT_DND_DROP_MNODE: case TDMT_DND_DROP_MNODE:
case TDMT_DND_DROP_QNODE: case TDMT_DND_DROP_QNODE:
case TDMT_DND_DROP_SNODE: case TDMT_DND_DROP_SNODE:
case TDMT_DND_DROP_BNODE: case TDMT_DND_DROP_BNODE:
code = dndProcessDropNodeMsg(pMgmt->pDnode, pMsg); code = dndProcessNodeMsg(pMgmt->pDnode, pMsg);
break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1; code = -1;