This commit is contained in:
Shengliang Guan 2022-03-16 19:58:29 +08:00
parent 3487d4f855
commit 43cf0a9928
7 changed files with 45 additions and 43 deletions

View File

@ -147,11 +147,11 @@ void dndSetStatus(SDnode *pDnode, EDndStatus stat);
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType); SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp); void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp);
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndSendMonitorReport(SDnode *pDnode); void dndSendMonitorReport(SDnode *pDnode);
int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); void dndSendRsp(void *pWrapper, SRpcMsg *pRsp);
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
int32_t maxNum, void *queueFp); int32_t maxNum, void *queueFp);

View File

@ -51,6 +51,7 @@ void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -219,34 +219,6 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
} }
} }
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp);
} else {
rpcSendResponse(pRsp);
}
}
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
int32_t code = -1;
if (pWrapper->procType != PROC_CHILD) {
dndSendRpcRsp(pWrapper, pRsp);
} else {
do {
code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
}
}
void dndSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
pRsp->code = TSDB_CODE_APP_NOT_READY;
dndSendRsp(pWrapper, pRsp);
}
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from child queue", pMsg); dTrace("msg:%p, get from child queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;

View File

@ -269,7 +269,7 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
return 0; return 0;
} }
static int32_t dndSetReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) { static int32_t dndSendRpcReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) {
if (pMgmt->clientRpc == NULL) { if (pMgmt->clientRpc == NULL) {
terrno = TSDB_CODE_DND_OFFLINE; terrno = TSDB_CODE_DND_OFFLINE;
return -1; return -1;
@ -281,16 +281,47 @@ static int32_t dndSetReq(STransMgmt *pMgmt, SEpSet *pEpSet, SRpcMsg *pReq) {
int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) { int32_t dndSendReqToDnode(void *wrapper, SEpSet *pEpSet, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = wrapper; SMgmtWrapper *pWrapper = wrapper;
if (pWrapper->procType == PROC_CHILD) {
} else {
STransMgmt *pTrans = &pWrapper->pDnode->trans; STransMgmt *pTrans = &pWrapper->pDnode->trans;
return dndSetReq(pTrans, pEpSet, pReq); return dndSendRpcReq(pTrans, pEpSet, pReq);
}
} }
int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) { int32_t dndSendReqToMnode(void *wrapper, SRpcMsg *pReq) {
SMgmtWrapper *pWrapper = wrapper; SMgmtWrapper *pWrapper = wrapper;
if (pWrapper->procType == PROC_CHILD) {
} else {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
STransMgmt *pTrans = &pDnode->trans; STransMgmt *pTrans = &pDnode->trans;
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet); dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE), &epSet);
return dndSetReq(pTrans, &epSet, pReq); return dndSendRpcReq(pTrans, &epSet, pReq);
}
}
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) {
dmSendRedirectRsp(dndGetWrapper(pWrapper->pDnode, DNODE), pRsp);
} else {
rpcSendResponse(pRsp);
}
}
void dndSendRsp(void *wrapper, SRpcMsg *pRsp) {
SMgmtWrapper *pWrapper = wrapper;
if (pWrapper->procType == PROC_CHILD) {
int32_t code = -1;
do {
code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
} else {
dndSendRpcRsp(pWrapper, pRsp);
}
} }

View File

@ -59,12 +59,11 @@ void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqd
void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { void dmSendRedirectRsp(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
tmsg_t msgType = pReq->msgType;
SEpSet epSet = {0}; SEpSet epSet = {0};
dmGetMnodeEpSet(pWrapper, &epSet); dmGetMnodeEpSet(pWrapper, &epSet);
dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse); dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
if (strcmp(epSet.eps[i].fqdn, pDnode->localFqdn) == 0 && epSet.eps[i].port == pDnode->serverPort) { if (strcmp(epSet.eps[i].fqdn, pDnode->localFqdn) == 0 && epSet.eps[i].port == pDnode->serverPort) {

View File

@ -53,7 +53,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
pMgmt->statusSent = 1; pMgmt->statusSent = 1;
dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle); dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle);
dndSendReqToMnode(pMgmt->dnodeEps, &rpcMsg); dndSendReqToMnode(pMgmt->pWrapper, &rpcMsg);
} }
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
@ -115,7 +115,6 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
void dmInitMsgHandles(SMgmtWrapper *pWrapper) { void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE // Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg);