From 9f3ef6cd7947070fb49a131f75fe77d3cb7f387d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 24 Dec 2021 01:35:39 -0800 Subject: [PATCH] adjust rpc code after change msgtype from int8 to int16 --- include/common/tmsg.h | 1 + source/dnode/mgmt/impl/src/dndDnode.c | 4 +- source/dnode/mgmt/impl/src/dndTransport.c | 170 +++++++++++----------- source/dnode/mnode/impl/inc/mndInt.h | 2 +- source/dnode/mnode/impl/inc/mndTrans.h | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 8 +- source/dnode/mnode/impl/src/mnode.c | 13 +- source/libs/transport/inc/rpcHead.h | 2 +- source/libs/transport/src/rpcMain.c | 20 +-- 9 files changed, 112 insertions(+), 110 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6dda3b195e..2c5f7346ca 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -48,6 +48,7 @@ extern int tMsgDict[]; #define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) #define TMSG_INFO(TYPE) tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] +#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index df82855c36..6b5aeb078a 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -63,7 +63,7 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { } void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg) { - int32_t msgType = pMsg->msgType; + tmsg_t msgType = pMsg->msgType; SEpSet epSet = {0}; dndGetMnodeEpSet(pDnode, &epSet); @@ -369,7 +369,7 @@ void dndSendStatusMsg(SDnode *pDnode) { dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); - SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS}; + SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = 9527}; pMgmt->statusSent = 1; dTrace("pDnode:%p, send status msg to mnode", pDnode); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index ac00e1e4f2..b8ebe3f884 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -31,104 +31,104 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { // msg from client to dnode - pMgmt->msgFp[TDMT_VND_SUBMIT] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_QUERY] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TDMT_VND_FETCH] = dndProcessVnodeFetchMsg; - pMgmt->msgFp[TDMT_MND_CREATE_TABLE] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_DROP_TABLE] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_ALTER_TABLE] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_UPDATE_TAG_VAL] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_TABLE_META] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TDMT_VND_TABLES_META] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TDMT_VND_MQ_QUERY] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TDMT_VND_MQ_CONSUME] = dndProcessVnodeQueryMsg; - pMgmt->msgFp[TDMT_VND_MQ_CONNECT] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_MQ_DISCONNECT] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_MQ_SET_CUR] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_RES_READY] = dndProcessVnodeFetchMsg; - pMgmt->msgFp[TDMT_VND_TASKS_STATUS] = dndProcessVnodeFetchMsg; - pMgmt->msgFp[TDMT_VND_CANCEL_TASK] = dndProcessVnodeFetchMsg; - pMgmt->msgFp[TDMT_VND_DROP_TASK] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TABLE)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TABLE)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg; // msg from client to mnode - pMgmt->msgFp[TDMT_MND_CONNECT] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TDMT_MND_CREATE_ACCT] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_ALTER_ACCT] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_DROP_ACCT] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_CREATE_USER] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_ALTER_USER] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_DROP_USER] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_CREATE_DNODE] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_CONFIG_DNODE] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_DROP_DNODE] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_CREATE_MNODE] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_DROP_MNODE] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_CREATE_DB] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_DROP_DB] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_USE_DB] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_ALTER_DB] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_SYNC_DB] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_COMPACT_DB] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_CREATE_FUNCTION] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_RETRIEVE_FUNCTION] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_DROP_FUNCTION] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_CREATE_STB] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_ALTER_STB] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_DROP_STB] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_VGROUP_LIST] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TDMT_MND_KILL_QUERY] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_KILL_CONN] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_HEARTBEAT] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TDMT_MND_SHOW] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TDMT_MND_SHOW_RETRIEVE] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_ACCT)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_USER)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_USER)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_USER)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DNODE)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONFIG_DNODE)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DNODE)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_MNODE)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_MNODE)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_USE_DB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNCTION)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNCTION)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNCTION)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; // message from client to dnode - pMgmt->msgFp[TDMT_DND_NETWORK_TEST] = dndProcessDnodeReq; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessDnodeReq; // message from mnode to vnode - pMgmt->msgFp[TDMT_VND_CREATE_STB] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_CREATE_STB_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_ALTER_STB] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_ALTER_STB_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_DROP_STB] = dndProcessVnodeWriteMsg; - pMgmt->msgFp[TDMT_VND_DROP_STB_RSP] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = dndProcessMnodeWriteMsg; // message from mnode to dnode - pMgmt->msgFp[TDMT_DND_CREATE_VNODE] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TDMT_DND_CREATE_VNODE_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_DND_ALTER_VNODE] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TDMT_DND_ALTER_VNODE_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_DND_DROP_VNODE] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TDMT_DND_DROP_VNODE_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_DND_SYNC_VNODE] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TDMT_DND_SYNC_VNODE_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_DND_AUTH_VNODE] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TDMT_DND_AUTH_VNODE_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_DND_COMPACT_VNODE] = dndProcessVnodeMgmtMsg; - pMgmt->msgFp[TDMT_DND_COMPACT_VNODE_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_DND_CREATE_MNODE] = dndProcessMnodeMgmtMsg; - pMgmt->msgFp[TDMT_DND_CREATE_MNODE_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_DND_ALTER_MNODE] = dndProcessMnodeMgmtMsg; - pMgmt->msgFp[TDMT_DND_ALTER_MNODE_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_DND_DROP_MNODE] = dndProcessMnodeMgmtMsg; - pMgmt->msgFp[TDMT_DND_DROP_MNODE_RSP] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_DND_CONFIG_DNODE] = dndProcessDnodeReq; - pMgmt->msgFp[TDMT_DND_CONFIG_DNODE_RSP] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessVnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessDnodeReq; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg; // message from dnode to mnode - pMgmt->msgFp[TDMT_MND_GRANT] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_GRANT_RSP] = dndProcessDnodeRsp; - pMgmt->msgFp[TDMT_MND_STATUS] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TDMT_MND_STATUS_RSP] = dndProcessDnodeRsp; - pMgmt->msgFp[TDMT_MND_AUTH] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TDMT_MND_AUTH_RSP] = dndProcessDnodeRsp; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessDnodeRsp; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessDnodeRsp; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessDnodeRsp; } static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; - int32_t msgType = pMsg->msgType; + tmsg_t msgType = pMsg->msgType; if (dndGetStat(pDnode) == DND_STAT_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; @@ -137,7 +137,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } - DndMsgFp fp = pMgmt->msgFp[msgType]; + DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; if (fp != NULL) { (*fp)(pDnode, pMsg, pEpSet); dTrace("RPC %p, rsp:%s is processed, code:0x%x", pMsg->handle, TMSG_INFO(msgType), pMsg->code & 0XFFFF); @@ -186,7 +186,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { SDnode *pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; - int32_t msgType = pMsg->msgType; + tmsg_t msgType = pMsg->msgType; if (msgType == TDMT_DND_NETWORK_TEST) { dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code); dndProcessDnodeReq(pDnode, pMsg, pEpSet); @@ -214,7 +214,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } - DndMsgFp fp = pMgmt->msgFp[msgType]; + DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; if (fp != NULL) { dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); (*fp)(pDnode, pMsg, pEpSet); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 5c78289102..ba8746c009 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -87,7 +87,7 @@ typedef struct SMnode { void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); void mndSendMsgToMnode(SMnode *pMnode, SRpcMsg *pMsg); void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg); -void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp); +void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); uint64_t mndGenerateUid(char *name, int32_t len) ; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index a8d37ba655..a424c315f9 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -24,7 +24,7 @@ extern "C" { typedef struct { SEpSet epSet; - int8_t msgType; + tmsg_t msgType; int8_t msgSent; int8_t msgReceived; int32_t errCode; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 1c0a9ad071..354d456cc4 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -132,7 +132,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); - SDB_SET_INT8(pRaw, dataPos, pAction->msgType) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType) SDB_SET_INT32(pRaw, dataPos, pAction->contLen) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen); } @@ -140,7 +140,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); - SDB_SET_INT8(pRaw, dataPos, pAction->msgType) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType) SDB_SET_INT32(pRaw, dataPos, pAction->contLen) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen); } @@ -243,7 +243,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction action = {0}; SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); - SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType) + SDB_GET_INT16(pRaw, pRow, dataPos, &action.msgType) SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen) action.pCont = malloc(action.contLen); if (action.pCont == NULL) { @@ -262,7 +262,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < undoActionNum; ++i) { STransAction action = {0}; SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); - SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType) + SDB_GET_INT16(pRaw, pRow, dataPos, &action.msgType) SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen) action.pCont = malloc(action.contLen); if (action.pCont == NULL) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 0199b0c583..26b1c71a10 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -374,7 +374,7 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { static void mndProcessRpcMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; int32_t code = 0; - int32_t msgType = pMsg->rpcMsg.msgType; + tmsg_t msgType = pMsg->rpcMsg.msgType; void *ahandle = pMsg->rpcMsg.ahandle; bool isReq = (msgType & 1U); @@ -392,10 +392,10 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { goto PROCESS_RPC_END; } - MndMsgFp fp = pMnode->msgFp[msgType]; + MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)]; if (fp == NULL) { code = TSDB_CODE_MSG_NOT_PROCESSED; - mError("msg:%p, app:%p failed to process since not handle", pMsg, ahandle); + mError("msg:%p, app:%p failed to process since no handle", pMsg, ahandle); goto PROCESS_RPC_END; } @@ -425,9 +425,10 @@ PROCESS_RPC_END: } } -void mndSetMsgHandle(SMnode *pMnode, int32_t msgType, MndMsgFp fp) { - if (msgType >= 0 && msgType < TDMT_MAX) { - pMnode->msgFp[msgType] = fp; +void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { + tmsg_t type = TMSG_INDEX(msgType); + if (type >= 0 && type < TDMT_MAX) { + pMnode->msgFp[type] = fp; } } diff --git a/source/libs/transport/inc/rpcHead.h b/source/libs/transport/inc/rpcHead.h index 5b401ac54b..6e98bbd563 100644 --- a/source/libs/transport/inc/rpcHead.h +++ b/source/libs/transport/inc/rpcHead.h @@ -52,7 +52,7 @@ typedef struct { char user[TSDB_UNI_LEN]; // user ID uint16_t port; // for UDP only, port may be changed char empty[1]; // reserved - uint8_t msgType; // message type + uint16_t msgType; // message type int32_t msgLen; // message length including the header iteslf uint32_t msgVer; int32_t code; // code in response message diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 71304fcca8..a9098b5053 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -74,7 +74,7 @@ typedef struct { SEpSet epSet; // ip list provided by app void *ahandle; // handle provided by app struct SRpcConn *pConn; // pConn allocated - char msgType; // message type + tmsg_t msgType; // message type uint8_t *pCont; // content provided by app int32_t contLen; // content length int32_t code; // error code @@ -108,8 +108,8 @@ typedef struct SRpcConn { uint16_t tranId; // outgoing transcation ID, for build message uint16_t outTranId; // outgoing transcation ID uint16_t inTranId; // transcation ID for incoming msg - uint8_t outType; // message type for outgoing request - uint8_t inType; // message type for incoming request + tmsg_t outType; // message type for outgoing request + tmsg_t inType; // message type for incoming request void *chandle; // handle passed by TCP/UDP connection layer void *ahandle; // handle provided by upper app layter int retry; // number of retry for sending request @@ -409,7 +409,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t // connection type is application specific. // for TDengine, all the query, show commands shall have TCP connection - char type = pMsg->msgType; + tmsg_t type = pMsg->msgType; if (type == TDMT_VND_QUERY || type == TDMT_MND_SHOW_RETRIEVE || type == TDMT_VND_FETCH || type == TDMT_MND_VGROUP_LIST || type == TDMT_VND_TABLES_META || type == TDMT_VND_TABLE_META @@ -957,7 +957,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont sid = htonl(pHead->destId); *ppContext = NULL; - if (pHead->msgType >= TDMT_MAX || pHead->msgType <= 0) { + if (TMSG_INDEX(pHead->msgType) >= TDMT_MAX || TMSG_INDEX(pHead->msgType) <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); terrno = TSDB_CODE_RPC_INVALID_MSG_TYPE; return NULL; } @@ -1094,7 +1094,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { SRpcReqContext *pContext; pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); - if (pHead->msgType >= 1 && pHead->msgType < TDMT_MAX) { + if (TMSG_INDEX(pHead->msgType) >= 1 && TMSG_INDEX(pHead->msgType) < TDMT_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), pRecv->ip, pRecv->port, terrno, pRecv->msgLen, pHead->sourceId, pHead->destId, pHead->tranId, pHead->code); @@ -1112,11 +1112,11 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE) { rpcCloseConn(pConn); } - if (pHead->msgType + 1 > 1 && pHead->msgType+1 < TDMT_MAX) { + if (TMSG_INDEX(pHead->msgType) + 1 > 1 && TMSG_INDEX(pHead->msgType) + 1 < TDMT_MAX) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType+1), code); } else { tError("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, TMSG_INFO(pHead->msgType), code); - } + } } } else { // msg is passed to app only parsing is ok rpcProcessIncomingMsg(pConn, pHead, pContext); @@ -1262,7 +1262,7 @@ static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code) { memset(msg, 0, sizeof(SRpcHead)); pReplyHead->version = pRecvHead->version; - pReplyHead->msgType = (char)(pRecvHead->msgType + 1); + pReplyHead->msgType = (tmsg_t)(pRecvHead->msgType + 1); pReplyHead->spi = 0; pReplyHead->encrypt = pRecvHead->encrypt; pReplyHead->tranId = pRecvHead->tranId; @@ -1292,7 +1292,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { SRpcHead *pHead = rpcHeadFromCont(pContext->pCont); char *msg = (char *)pHead; int msgLen = rpcMsgLenFromCont(pContext->contLen); - char msgType = pContext->msgType; + tmsg_t msgType = pContext->msgType; pContext->numOfTry++; SRpcConn *pConn = rpcSetupConnToServer(pContext);