commit
a6044dc21d
|
@ -89,6 +89,9 @@ typedef struct {
|
|||
} SDnodeWorker;
|
||||
|
||||
typedef struct SMsgHandle {
|
||||
int32_t vgId;
|
||||
NodeMsgFp vgIdMsgFp;
|
||||
SMgmtWrapper *pVgIdWrapper; // Handle the case where the same message type is distributed to qnode or vnode
|
||||
NodeMsgFp msgFp;
|
||||
SMgmtWrapper *pWrapper;
|
||||
} SMsgHandle;
|
||||
|
@ -114,6 +117,7 @@ typedef struct SMgmtWrapper {
|
|||
void *pMgmt;
|
||||
SDnode *pDnode;
|
||||
NodeMsgFp msgFps[TDMT_MAX];
|
||||
int32_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode
|
||||
SMgmtFp fp;
|
||||
} SMgmtWrapper;
|
||||
|
||||
|
@ -149,7 +153,7 @@ typedef struct SDnode {
|
|||
EDndStatus dndGetStatus(SDnode *pDnode);
|
||||
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
|
||||
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
|
||||
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp);
|
||||
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId);
|
||||
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
|
||||
void dndSendMonitorReport(SDnode *pDnode);
|
||||
|
||||
|
|
|
@ -65,8 +65,9 @@ void dndCleanup() {
|
|||
dInfo("dnode env is cleaned up");
|
||||
}
|
||||
|
||||
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
|
||||
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId) {
|
||||
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
|
||||
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
|
||||
}
|
||||
|
||||
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
|
||||
|
|
|
@ -20,13 +20,26 @@
|
|||
#define INTERNAL_CKEY "_key"
|
||||
#define INTERNAL_SECRET "_pwd"
|
||||
|
||||
static inline void dndProcessQVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
int32_t vgId = htonl(pHead->vgId);
|
||||
|
||||
SMgmtWrapper *pWrapper = pHandle->pWrapper;
|
||||
if (vgId == pHandle->vgId && pHandle->pVgIdWrapper != NULL) {
|
||||
pWrapper = pHandle->pVgIdWrapper;
|
||||
}
|
||||
|
||||
dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name,
|
||||
pMsg->handle, pMsg->ahandle, vgId);
|
||||
dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
|
||||
}
|
||||
|
||||
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
||||
SDnode *pDnode = parent;
|
||||
STransMgmt *pMgmt = &pDnode->trans;
|
||||
tmsg_t msgType = pRsp->msgType;
|
||||
|
||||
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
|
||||
// if (pRsp == NULL || pRsp->pCont == NULL) return;
|
||||
dTrace("rsp:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
|
||||
rpcFreeCont(pRsp->pCont);
|
||||
return;
|
||||
|
@ -34,9 +47,13 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
|||
|
||||
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
|
||||
if (pHandle->msgFp != NULL) {
|
||||
dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
|
||||
pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
|
||||
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
|
||||
if (pHandle->vgId == 0) {
|
||||
dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
|
||||
pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
|
||||
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
|
||||
} else {
|
||||
dndProcessQVnodeRpcMsg(pHandle, pRsp, pEpSet);
|
||||
}
|
||||
} else {
|
||||
dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
|
||||
rpcFreeCont(pRsp->pCont);
|
||||
|
@ -110,9 +127,13 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
|||
|
||||
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
|
||||
if (pHandle->msgFp != NULL) {
|
||||
dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
|
||||
pReq->handle, pReq->ahandle);
|
||||
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
|
||||
if (pHandle->vgId == 0) {
|
||||
dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
|
||||
pReq->handle, pReq->ahandle);
|
||||
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
|
||||
} else {
|
||||
dndProcessQVnodeRpcMsg(pHandle, pReq, pEpSet);
|
||||
}
|
||||
} else {
|
||||
dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
|
||||
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
|
||||
|
@ -245,17 +266,24 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
|
|||
|
||||
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
|
||||
NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
|
||||
int32_t vgId = pWrapper->msgVgIds[msgIndex];
|
||||
if (msgFp == NULL) continue;
|
||||
|
||||
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
|
||||
if (pHandle->msgFp != NULL) {
|
||||
dError("msg:%s has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex],
|
||||
pHandle->pWrapper->name, pWrapper->name);
|
||||
if (pHandle->msgFp != NULL && pHandle->vgId == vgId) {
|
||||
dError("msg:%s has multiple process nodes, prev node:%s:%d, curr node:%s:%d", tMsgInfo[msgIndex],
|
||||
pHandle->pWrapper->name, pHandle->pWrapper->msgVgIds[msgIndex], pWrapper->name, vgId);
|
||||
return -1;
|
||||
} else {
|
||||
dTrace("msg:%s will be processed by %s", tMsgInfo[msgIndex], pWrapper->name);
|
||||
pHandle->msgFp = msgFp;
|
||||
pHandle->pWrapper = pWrapper;
|
||||
dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
|
||||
if (vgId == 0) {
|
||||
pHandle->msgFp = msgFp;
|
||||
pHandle->pWrapper = pWrapper;
|
||||
} else {
|
||||
pHandle->vgId = vgId;
|
||||
pHandle->vgIdMsgFp = msgFp;
|
||||
pHandle->pVgIdWrapper = pWrapper;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,19 +114,19 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
|
||||
void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by DNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
|
||||
// Requests handled by MNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
}
|
||||
|
|
|
@ -75,78 +75,78 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
|
||||
void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by DNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
|
||||
// Requests handled by MNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
|
||||
// Requests handled by VNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
}
|
||||
|
|
|
@ -54,4 +54,16 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {}
|
||||
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by VNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)qmProcessQueryMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)qmProcessQueryMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
}
|
||||
|
|
|
@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
|
||||
void smInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by SNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg, 0);
|
||||
}
|
||||
|
|
|
@ -243,42 +243,42 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
|
||||
void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by VNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
}
|
||||
|
|
|
@ -62,11 +62,10 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
|||
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
|
||||
if (code != 0) {
|
||||
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
||||
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
||||
|
@ -74,12 +73,10 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
|||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
|
||||
if (code != 0) {
|
||||
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
||||
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||
// TODO: handle invalid write
|
||||
/*rpcFreeCont(pMsg->rpcMsg.pCont);*/
|
||||
/*taosFreeQitem(pMsg);*/
|
||||
}
|
||||
|
||||
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||
|
@ -163,7 +160,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
|
|||
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||
if (pVnode == NULL) {
|
||||
dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr());
|
||||
dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue