refactor: adjust msgcb
This commit is contained in:
parent
7e89f1c427
commit
0164158256
|
@ -60,9 +60,9 @@ typedef struct {
|
||||||
ReportStartup reportStartupFp;
|
ReportStartup reportStartupFp;
|
||||||
} SMsgCb;
|
} SMsgCb;
|
||||||
|
|
||||||
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
|
void tmsgSetDefault(const SMsgCb* msgcb);
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg);
|
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg);
|
||||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype);
|
||||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
|
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
|
||||||
void tmsgSendRsp(SRpcMsg* pMsg);
|
void tmsgSendRsp(SRpcMsg* pMsg);
|
||||||
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
|
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet);
|
||||||
|
|
|
@ -17,46 +17,46 @@
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
|
||||||
static SMsgCb tsDefaultMsgCb;
|
static SMsgCb defaultMsgCb;
|
||||||
|
|
||||||
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
|
void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; }
|
||||||
|
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pMsg) {
|
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
|
||||||
PutToQueueFp fp = pMsgCb->queueFps[qtype];
|
PutToQueueFp fp = msgcb->queueFps[qtype];
|
||||||
return (*fp)(pMsgCb->mgmt, pMsg);
|
return (*fp)(msgcb->mgmt, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
|
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) {
|
||||||
GetQueueSizeFp fp = pMsgCb->qsizeFp;
|
GetQueueSizeFp fp = msgcb->qsizeFp;
|
||||||
return (*fp)(pMsgCb->mgmt, vgId, qtype);
|
return (*fp)(msgcb->mgmt, vgId, qtype);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
||||||
SendReqFp fp = tsDefaultMsgCb.sendReqFp;
|
SendReqFp fp = defaultMsgCb.sendReqFp;
|
||||||
return (*fp)(epSet, pMsg);
|
return (*fp)(epSet, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRsp(SRpcMsg* pMsg) {
|
void tmsgSendRsp(SRpcMsg* pMsg) {
|
||||||
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
|
SendRspFp fp = defaultMsgCb.sendRspFp;
|
||||||
return (*fp)(pMsg);
|
return (*fp)(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
|
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) {
|
||||||
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
|
SendRedirectRspFp fp = defaultMsgCb.sendRedirectRspFp;
|
||||||
(*fp)(pMsg, pNewEpSet);
|
(*fp)(pMsg, pNewEpSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) {
|
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) {
|
||||||
RegisterBrokenLinkArgFp fp = tsDefaultMsgCb.registerBrokenLinkArgFp;
|
RegisterBrokenLinkArgFp fp = defaultMsgCb.registerBrokenLinkArgFp;
|
||||||
(*fp)(pMsg);
|
(*fp)(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) {
|
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) {
|
||||||
ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp;
|
ReleaseHandleFp fp = defaultMsgCb.releaseHandleFp;
|
||||||
(*fp)(pHandle, type);
|
(*fp)(pHandle, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgReportStartup(const char* name, const char* desc) {
|
void tmsgReportStartup(const char* name, const char* desc) {
|
||||||
ReportStartup fp = tsDefaultMsgCb.reportStartupFp;
|
ReportStartup fp = defaultMsgCb.reportStartupFp;
|
||||||
(*fp)(name, desc);
|
(*fp)(name, desc);
|
||||||
}
|
}
|
|
@ -78,7 +78,7 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||||
|
|
||||||
if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) {
|
if (pWrapper->ntype == DNODE || InChildProc(pWrapper)) {
|
||||||
tmsgSetDefaultMsgCb(&input.msgCb);
|
tmsgSetDefault(&input.msgCb);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OnlyInSingleProc(pWrapper)) {
|
if (OnlyInSingleProc(pWrapper)) {
|
||||||
|
|
|
@ -56,7 +56,7 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
msgCb.sendReqFp = sendReq;
|
msgCb.sendReqFp = sendReq;
|
||||||
msgCb.sendRspFp = sendRsp;
|
msgCb.sendRspFp = sendRsp;
|
||||||
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
||||||
tmsgSetDefaultMsgCb(&msgCb);
|
tmsgSetDefault(&msgCb);
|
||||||
|
|
||||||
SMnodeOpt opt = {0};
|
SMnodeOpt opt = {0};
|
||||||
opt.deploy = 1;
|
opt.deploy = 1;
|
||||||
|
|
Loading…
Reference in New Issue