From 43f7e18c65dfc8a1f68031e9f15afb6b9197cd06 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 13:44:43 +0800 Subject: [PATCH] serialize timer msg --- include/common/tmsg.h | 11 +++++------ include/common/tmsgdef.h | 2 +- source/common/src/tmsg.c | 27 ++++++++++++++++++++++++++- source/dnode/mgmt/impl/src/dndMgmt.c | 2 +- source/dnode/mnode/impl/src/mnode.c | 23 +++++++++++++++++++---- 5 files changed, 52 insertions(+), 13 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6f35dd797e..e46bc24137 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -720,8 +720,11 @@ int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); int32_t tDeserializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); typedef struct { - int32_t reserve; -} STransReq; + int32_t reserved; +} SMTimerReq; + +int32_t tSerializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq); +int32_t tDeserializeSMTimerMsg(void* buf, int32_t bufLen, SMTimerReq* pReq); typedef struct { int32_t id; @@ -1195,10 +1198,6 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq return buf; } -typedef struct { - int32_t reserved; -} SMqTmrMsg; - typedef struct { const char* key; SArray* lostConsumers; // SArray diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 2f41e574bd..e5e6fbd0ca 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -142,7 +142,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) - TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMqTmrMsg, SMqTmrMsg) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) // Requests handled by VNODE diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 144054d078..8f4ebffd12 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1984,7 +1984,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1; - if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1; if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; @@ -1994,3 +1994,28 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { tCoderClear(&decoder); return 0; } + +int32_t tSerializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->reserved) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->reserved) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index ff8efb6a1d..93c2d4e3e1 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -394,7 +394,7 @@ void dndSendStatusReq(SDnode *pDnode) { static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; if (pMgmt->dnodeId == 0) { - dInfo("set dnodeId:%d clusterId:0x%" PRId64, pCfg->dnodeId, pCfg->clusterId); + dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); taosWLockLatch(&pMgmt->latch); pMgmt->dnodeId = pCfg->dnodeId; pMgmt->clusterId = pCfg->clusterId; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 211cfbd1f0..699ccab92c 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -60,11 +60,25 @@ void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) { } } +static void *mndBuildTimerMsg(int32_t *pContLen) { + SMTimerReq timerReq = {0}; + + int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq); + if (contLen <= 0) return NULL; + void *pReq = rpcMallocCont(contLen); + if (pReq == NULL) return NULL; + + tSerializeSMTimerMsg(pReq, contLen, &timerReq); + *pContLen = contLen; + return pReq; +} + static void mndTransReExecute(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { - STransReq *pMsg = rpcMallocCont(sizeof(STransReq)); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransReq)}; + int32_t contLen = 0; + void *pReq = mndBuildTimerMsg(&contLen); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } @@ -74,8 +88,9 @@ static void mndTransReExecute(void *param, void *tmrId) { static void mndCalMqRebalance(void *param, void *tmrId) { SMnode *pMnode = param; if (mndIsMaster(pMnode)) { - SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg)); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)}; + int32_t contLen = 0; + void *pReq = mndBuildTimerMsg(&contLen); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); }