From fb861a3935fb4fb0348ac2366e715097032e5687 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 17 Nov 2022 14:09:15 +0800 Subject: [PATCH] enh: refactor tmq messages --- include/common/tmsg.h | 6 +++ source/client/src/clientTmq.c | 58 ++++++++++++++------ source/common/src/tmsg.c | 65 +++++++++++++++++++++++ source/dnode/mnode/impl/src/mndConsumer.c | 23 +++++--- source/dnode/vnode/src/vnd/vnodeQuery.c | 8 ++- 5 files changed, 135 insertions(+), 25 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7d0b57e310..8066694709 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3198,6 +3198,12 @@ static FORCE_INLINE void tFreeSBatchRspMsg(void* p) { taosMemoryFree(pRsp->msg); } +int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq); +int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq); +int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq); +int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq); + + #pragma pack(pop) #ifdef __cplusplus diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ab44236d96..1fe89bcafd 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -728,12 +728,26 @@ void tmqSendHbReq(void* param, void* tmrId) { taosMemoryFree(param); return; } - int64_t consumerId = tmq->consumerId; - int32_t epoch = tmq->epoch; - SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq)); - if (pReq == NULL) goto OVER; - pReq->consumerId = htobe64(consumerId); - pReq->epoch = epoch; + + SMqHbReq req = {0}; + req.consumerId = tmq->consumerId; + req.epoch = tmq->epoch; + + int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); + if (tlen < 0) { + tscError("tSerializeSMqHbReq failed"); + return; + } + void *pReq = taosMemoryCalloc(1, tlen); + if (tlen < 0) { + tscError("failed to malloc MqHbReq msg, size:%d", tlen); + return; + } + if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) { + tscError("tSerializeSMqHbReq %d failed", tlen); + taosMemoryFree(pReq); + return; + } SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { @@ -1378,21 +1392,31 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { } atomic_store_32(&tmq->epSkipCnt, 0); #endif - int32_t tlen = sizeof(SMqAskEpReq); - SMqAskEpReq* req = taosMemoryCalloc(1, tlen); - if (req == NULL) { - tscError("failed to malloc get subscribe ep buf"); - /*atomic_store_8(&tmq->epStatus, 0);*/ + SMqAskEpReq req = {0}; + req.consumerId = tmq->consumerId; + req.epoch = tmq->epoch; + strcpy(req.cgroup, tmq->groupId); + + int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); + if (tlen < 0) { + tscError("tSerializeSMqAskEpReq failed"); + return -1; + } + void *pReq = taosMemoryCalloc(1, tlen); + if (tlen < 0) { + tscError("failed to malloc askEpReq msg, size:%d", tlen); + return -1; + } + if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { + tscError("tSerializeSMqAskEpReq %d failed", tlen); + taosMemoryFree(pReq); return -1; } - req->consumerId = htobe64(tmq->consumerId); - req->epoch = htonl(tmq->epoch); - strcpy(req->cgroup, tmq->groupId); SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("failed to malloc subscribe param"); - taosMemoryFree(req); + taosMemoryFree(pReq); /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } @@ -1405,13 +1429,13 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { if (sendInfo == NULL) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); - taosMemoryFree(req); + taosMemoryFree(pReq); /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } sendInfo->msgInfo = (SDataBuf){ - .pData = req, + .pData = pReq, .len = tlen, .handle = NULL, }; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1471fb0f59..63a2b712fc 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4579,6 +4579,71 @@ int32_t tDeserializeSBatchRsp(void *buf, int32_t bufLen, SBatchRsp *pRsp) { } +int32_t tSerializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->cgroup) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + return tlen; +} + +int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->cgroup) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->epoch) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + + return tlen; +} + +int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + + int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) { int32_t headLen = sizeof(SMsgHead); if (buf != NULL) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 62ad5bae15..300251d64d 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -325,9 +325,14 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; - SMqHbReq *pReq = (SMqHbReq *)pMsg->pCont; - int64_t consumerId = be64toh(pReq->consumerId); + SMqHbReq req = {0}; + if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int64_t consumerId = req.consumerId; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { mError("consumer %" PRId64 " not exist", consumerId); @@ -359,10 +364,16 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; - SMqAskEpReq *pReq = (SMqAskEpReq *)pMsg->pCont; + SMqAskEpReq req = {0}; SMqAskEpRsp rsp = {0}; - int64_t consumerId = be64toh(pReq->consumerId); - int32_t epoch = ntohl(pReq->epoch); + + if (tDeserializeSMqAskEpReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int64_t consumerId = req.consumerId; + int32_t epoch = req.epoch; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { @@ -370,7 +381,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { return -1; } - ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); + ASSERT(strcmp(req.cgroup, pConsumer->cgroup) == 0); atomic_store_32(&pConsumer->hbStatus, 0); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index bc7c645999..8d6ebe5c14 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -301,6 +301,7 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg)); if (NULL == batchRsp.pRsps) { code = TSDB_CODE_OUT_OF_MEMORY; + qError("taosArrayInit %d SBatchRspMsg failed", msgNum); goto _exit; } @@ -337,15 +338,18 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp); if (rspSize < 0) { + qError("tSerializeSBatchRsp failed"); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } pRsp = rpcMallocCont(rspSize); if (pRsp == NULL) { + qError("rpcMallocCont %d failed", rspSize); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp)) { + if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp) < 0) { + qError("tSerializeSBatchRsp %d failed", rspSize); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -362,7 +366,7 @@ _exit: qError("vnd get batch meta failed cause of %s", tstrerror(code)); } - taosArrayDestroyEx(batchRsp.pRsps, vnodeFreeSBatchRspMsg); + taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg); tmsgSendRsp(&rspMsg);