This commit is contained in:
Shengliang Guan 2022-03-16 20:25:37 +08:00
parent 43cf0a9928
commit 148af69ba6
5 changed files with 46 additions and 41 deletions

View File

@ -202,7 +202,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
} }
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
if (dmStart(pWrapper) != 0) { if (dmStart(pWrapper->pMgmt) != 0) {
dError("failed to start dnode worker since %s", terrstr()); dError("failed to start dnode worker since %s", terrstr());
return -1; return -1;
} }
@ -372,7 +372,7 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE), pEpSet); dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet);
} }
int32_t code = -1; int32_t code = -1;
@ -396,7 +396,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user); dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user);
if (pWrapper->procType == PROC_SINGLE) { if (pWrapper->procType == PROC_SINGLE) {
code = (*msgFp)(pWrapper, pMsg); code = (*msgFp)(pWrapper->pMgmt, pMsg);
} else if (pWrapper->procType == PROC_PARENT) { } else if (pWrapper->procType == PROC_PARENT) {
code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
} else { } else {

View File

@ -131,7 +131,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet); dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE)->pMgmt, &epSet);
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
} }
@ -297,14 +297,15 @@ int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
STransMgmt *pTrans = &pDnode->trans; STransMgmt *pTrans = &pDnode->trans;
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet); dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE)->pMgmt, &epSet);
return dndSendRpcReq(pTrans, &epSet, pReq); return dndSendRpcReq(pTrans, &epSet, pReq);
} }
} }
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) { if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp); SMgmtWrapper *pDnodeWrapper = dndGetWrapper(pWrapper->pDnode, DNODE);
dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp);
} else { } else {
rpcSendResponse(pRsp); rpcSendResponse(pRsp);
} }

View File

@ -22,13 +22,15 @@
extern "C" { extern "C" {
#endif #endif
typedef struct SDnodeMgmt SDnodeMgmt;
void dmGetMgmtFp(SMgmtWrapper *pWrapper); void dmGetMgmtFp(SMgmtWrapper *pWrapper);
void dmInitMsgHandles(SMgmtWrapper *pWrapper); void dmInitMsgHandles(SMgmtWrapper *pWrapper);
void dmGetMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet); void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet); void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmStart(SMgmtWrapper *pWrapper); int32_t dmStart(SDnodeMgmt *pMgmt);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -16,19 +16,16 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmInt.h" #include "dmInt.h"
void dmGetMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet) { void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
*pEpSet = pMgmt->mnodeEpSet; *pEpSet = pMgmt->mnodeEpSet;
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
} }
void dmUpdateMnodeEpSet(SMgmtWrapper *pWrapper, SEpSet *pEpSet) { void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet) {
dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse);
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
taosWLockLatch(&pMgmt->latch); taosWLockLatch(&pMgmt->latch);
pMgmt->mnodeEpSet = *pEpSet; pMgmt->mnodeEpSet = *pEpSet;
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
@ -57,11 +54,11 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
} }
void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { void dmSendRedirectRsp(SDnodeMgmt *pMgmt, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pMgmt->pDnode;
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(pWrapper, &epSet); dmGetMnodeEpSet(pMgmt, &epSet);
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
@ -76,10 +73,7 @@ void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
rpcSendRedirectRsp(pReq->handle, &epSet); rpcSendRedirectRsp(pReq->handle, &epSet);
} }
int32_t dmStart(SMgmtWrapper *pWrapper) { int32_t dmStart(SDnodeMgmt *pMgmt) { return dmStartWorker(pMgmt); }
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
return dmStartWorker(pMgmt);
}
int32_t dmInit(SMgmtWrapper *pWrapper) { int32_t dmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;

View File

@ -52,47 +52,57 @@ static void *dmThreadRoutine(void *param) {
} }
} }
static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pMsg) { static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t code = -1; int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType; tmsg_t msgType = pMsg->rpcMsg.msgType;
SDnode *pDnode = pMgmt->pDnode;
SMgmtWrapper *pWrapper = NULL;
dTrace("msg:%p, will be processed", pMsg); dTrace("msg:%p, will be processed", pMsg);
switch (msgType) { switch (msgType) {
case TDMT_DND_CREATE_MNODE: case TDMT_DND_CREATE_MNODE:
code = mmProcessCreateReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg); pWrapper = dndGetWrapper(pDnode, MNODE);
code = mmProcessCreateReq(pWrapper->pMgmt, pMsg);
break; break;
case TDMT_DND_DROP_MNODE: case TDMT_DND_DROP_MNODE:
code = mmProcessDropReq(dndGetWrapper(pDnode, MNODE)->pMgmt, pMsg); pWrapper = dndGetWrapper(pDnode, MNODE);
code = mmProcessDropReq(pWrapper->pMgmt, pMsg);
break; break;
case TDMT_DND_CREATE_QNODE: case TDMT_DND_CREATE_QNODE:
code = qmProcessCreateReq(dndGetWrapper(pDnode, QNODE)->pMgmt, pMsg); pWrapper = dndGetWrapper(pDnode, QNODE);
code = qmProcessCreateReq(pWrapper->pMgmt, pMsg);
break; break;
case TDMT_DND_DROP_QNODE: case TDMT_DND_DROP_QNODE:
code = qmProcessDropReq(dndGetWrapper(pDnode, QNODE)->pMgmt, pMsg); pWrapper = dndGetWrapper(pDnode, QNODE);
code = qmProcessDropReq(pWrapper->pMgmt, pMsg);
break; break;
case TDMT_DND_CREATE_SNODE: case TDMT_DND_CREATE_SNODE:
code = smProcessCreateReq(dndGetWrapper(pDnode, SNODE)->pMgmt, pMsg); pWrapper = dndGetWrapper(pDnode, SNODE);
code = smProcessCreateReq(pWrapper->pMgmt, pMsg);
break; break;
case TDMT_DND_DROP_SNODE: case TDMT_DND_DROP_SNODE:
code = smProcessDropReq(dndGetWrapper(pDnode, SNODE)->pMgmt, pMsg); pWrapper = dndGetWrapper(pDnode, SNODE);
code = smProcessDropReq(pWrapper->pMgmt, pMsg);
break; break;
case TDMT_DND_CREATE_BNODE: case TDMT_DND_CREATE_BNODE:
code = bmProcessCreateReq(dndGetWrapper(pDnode, BNODE)->pMgmt, pMsg); pWrapper = dndGetWrapper(pDnode, BNODE);
code = bmProcessCreateReq(pWrapper->pMgmt, pMsg);
break; break;
case TDMT_DND_DROP_BNODE: case TDMT_DND_DROP_BNODE:
code = bmProcessDropReq(dndGetWrapper(pDnode, BNODE)->pMgmt, pMsg); pWrapper = dndGetWrapper(pDnode, BNODE);
code = bmProcessDropReq(pWrapper->pMgmt, pMsg);
break; break;
case TDMT_DND_CONFIG_DNODE: case TDMT_DND_CONFIG_DNODE:
code = dmProcessConfigReq(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); code = dmProcessConfigReq(pMgmt, pMsg);
break; break;
case TDMT_MND_STATUS_RSP: case TDMT_MND_STATUS_RSP:
code = dmProcessStatusRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); code = dmProcessStatusRsp(pMgmt, pMsg);
break; break;
case TDMT_MND_AUTH_RSP: case TDMT_MND_AUTH_RSP:
code = dmProcessAuthRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); code = dmProcessAuthRsp(pMgmt, pMsg);
break; break;
case TDMT_MND_GRANT_RSP: case TDMT_MND_GRANT_RSP:
code = dmProcessGrantRsp(dndGetWrapper(pDnode, DNODE)->pMgmt, pMsg); code = dmProcessGrantRsp(pMgmt, pMsg);
break; break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
@ -114,14 +124,12 @@ static void dmProcessQueue(SDnode *pDnode, SNodeMsg *pMsg) {
} }
int32_t dmStartWorker(SDnodeMgmt *pMgmt) { int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
SDnode *pDnode = pMgmt->pDnode; if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) {
if (dndInitWorker(pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr()); dError("failed to start dnode mgmt worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitWorker(pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) { if (dndInitWorker(pMgmt, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr()); dError("failed to start dnode mgmt worker since %s", terrstr());
return -1; return -1;
} }