multi process

This commit is contained in:
Shengliang Guan 2022-04-01 19:52:14 +08:00
parent 8176ff04d1
commit 44311425db
10 changed files with 5 additions and 42 deletions

View File

@ -50,7 +50,6 @@ typedef struct {
PutToQueueFp queueFps[QUEUE_MAX]; PutToQueueFp queueFps[QUEUE_MAX];
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp; ReleaseHandleFp releaseHandleFp;
@ -60,7 +59,6 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
void tmsgSendRsp(const SRpcMsg* pRsp); void tmsgSendRsp(const SRpcMsg* pRsp);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
void tmsgReleaseHandle(void* handle, int8_t type); void tmsgReleaseHandle(void* handle, int8_t type);

View File

@ -32,10 +32,6 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
} }
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
}
void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); } void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); }
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {

View File

@ -22,7 +22,6 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb; pOption->msgCb = msgCb;

View File

@ -398,7 +398,6 @@ SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
.pWrapper = pWrapper, .pWrapper = pWrapper,
.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg, .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
.releaseHandleFp = dndReleaseHandle, .releaseHandleFp = dndReleaseHandle,
.sendMnodeReqFp = dndSendReqToMnode,
.sendReqFp = dndSendReqToDnode, .sendReqFp = dndSendReqToDnode,
.sendRspFp = dndSendRsp, .sendRspFp = dndSendRsp,
}; };

View File

@ -39,16 +39,11 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
} }
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SMsgCb msgCb = {0}; SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue; msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue; msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb; pOption->msgCb = msgCb;
} }

View File

@ -19,15 +19,10 @@
static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
SMsgCb msgCb = {0}; SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
msgCb.qsizeFp = qmGetQueueSize; msgCb.qsizeFp = qmGetQueueSize;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb; pOption->msgCb = msgCb;
} }

View File

@ -19,12 +19,7 @@
static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
SMsgCb msgCb = {0}; SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb; pOption->msgCb = msgCb;
} }

View File

@ -128,16 +128,12 @@ static void *vmOpenVnodeFunc(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes); pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
SMsgCb msgCb = {0}; SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
msgCb.pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.qsizeFp = vmGetQueueSize; msgCb.qsizeFp = vmGetQueueSize;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) { if (pImpl == NULL) {

View File

@ -76,16 +76,12 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return -1; return -1;
} }
SMsgCb msgCb = {0}; SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper);
msgCb.pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
msgCb.qsizeFp = vmGetQueueSize; msgCb.qsizeFp = vmGetQueueSize;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
vnodeCfg.msgCb = msgCb; vnodeCfg.msgCb = msgCb;
vnodeCfg.pTfs = pMgmt->pTfs; vnodeCfg.pTfs = pMgmt->pTfs;

View File

@ -433,12 +433,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) {
pHeartbeat->connId = htonl(pHeartbeat->connId); pHeartbeat->connId = htonl(pHeartbeat->connId);
pHeartbeat->pid = htonl(pHeartbeat->pid); pHeartbeat->pid = htonl(pHeartbeat->pid);
SRpcConnInfo info = {0};
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr());
return -1;
}
SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId); SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId);
if (pConn == NULL) { if (pConn == NULL) {
pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0); pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0);