refactor: node mgmt

This commit is contained in:
Shengliang Guan 2022-05-12 15:53:54 +08:00
parent d33c4173ad
commit bf2d1a7989
6 changed files with 25 additions and 14 deletions

View File

@ -42,7 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp);
typedef void (*SendMnodeRecvFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq, const SRpcMsg* pRsp);
typedef void (*SendMnodeRecvFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq, SRpcMsg* pRsp);
typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type);

View File

@ -274,7 +274,7 @@ int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SNodeMsg *pMsg) {
SArray *vmGetMsgHandles() {
int32_t code = -1;
SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle));
SArray *pArray = taosArrayInit(32, sizeof(SMgmtHandle));
if (pArray == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MON_VM_INFO, vmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;

View File

@ -116,8 +116,6 @@ void dmCleanupClient(SDnode *pDnode);
SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper);
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper);
int32_t dmInitMsgHandle(SDnode *pDnode);
void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp);
// mgmt nodes
SMgmtFunc dmGetMgmtFunc();

View File

@ -114,8 +114,10 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
return -1;
}
SMgmtInputOpt *pInput = &pWrapper->pDnode->input;
SMgmtOutputOpt output = {0};
SMgmtInputOpt *pInput = &pWrapper->pDnode->input;
pInput->name = pWrapper->name;
pInput->path = pWrapper->path;
pInput->msgCb = dmGetMsgcb(pWrapper);
if (pWrapper->nodeType == DNODE) {
tmsgSetDefaultMsgCb(&pInput->msgCb);

View File

@ -71,8 +71,12 @@ static void dmClearVars(SDnode *pDnode) {
}
static bool dmRequireNode(SMgmtWrapper *pWrapper) {
SMgmtInputOpt *pInput = &pWrapper->pDnode->input;
pInput->name = pWrapper->name;
pInput->path = pWrapper->path;
bool required = false;
int32_t code = (*pWrapper->func.requiredFp)(&pWrapper->pDnode->input, &required);
int32_t code = (*pWrapper->func.requiredFp)(pInput, &required);
if (!required) {
dDebug("node:%s, does not require startup", pWrapper->name);
}
@ -80,7 +84,7 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) {
}
SDnode *dmCreate(const SDnodeOpt *pOption) {
dDebug("start to create dnode");
dInfo("start to create dnode");
int32_t code = -1;
char path[PATH_MAX + 100] = {0};
SDnode *pDnode = NULL;
@ -91,6 +95,10 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
if (dmInitVars(pDnode, pOption) != 0) {
goto _OVER;
}
dmSetStatus(pDnode, DND_STAT_INIT);
pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
pDnode->wrappers[MNODE].func = mmGetMgmtFunc();

View File

@ -256,14 +256,14 @@ static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
}
}
void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
static inline void dmSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
}
void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
static inline void dmSendToMnodeRecv(SMgmtWrapper *pWrapper, SRpcMsg *pReq, SRpcMsg *pRsp) {
SEpSet epSet = {0};
dmGetMnodeEpSet(pDnode, &epSet);
rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp);
dmGetMnodeEpSet(pWrapper->pDnode, &epSet);
dmSendRecv(pWrapper->pDnode, &epSet, pReq, pRsp);
}
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
@ -485,8 +485,10 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
SRpcMsg rpcRsp = {0};
SEpSet epSet = {0};
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
dmSendToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
dmGetMnodeEpSet(pDnode, &epSet);
dmSendRecv(pDnode, &epSet, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
terrno = rpcRsp.code;
@ -543,14 +545,15 @@ void dmCleanupServer(SDnode *pDnode) {
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = {
.pWrapper = pWrapper,
.clientRpc = pWrapper->pDnode->trans.clientRpc,
.sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp,
.sendMnodeRecvFp = dmSendToMnodeRecv,
.sendRedirectRspFp = dmSendRedirectRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle,
.reportStartupFp = dmReportStartupByWrapper,
.clientRpc = pWrapper->pDnode->trans.clientRpc,
.pWrapper = pWrapper,
};
return msgCb;
}