diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 02d4c2279c..7d1c6a91b3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2948,6 +2948,10 @@ typedef struct { STqOffsetVal reqOffset; } SMqPollReq; +int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq); +int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq); + + typedef struct { int32_t vgId; int64_t offset; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 28b41b97c5..1dd3174c29 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1461,12 +1461,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { return code; } -SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { - SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq)); - if (pReq == NULL) { - return NULL; - } - +void tmqBuildConsumeReqImpl(SMqPollReq *pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { /*strcpy(pReq->topic, pTopic->topicName);*/ /*strcpy(pReq->cgroup, tmq->groupId);*/ @@ -1485,9 +1480,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pReq->useSnapshot = tmq->useSnapshot; - pReq->head.vgId = htonl(pVg->vgId); - pReq->head.contLen = htonl(sizeof(SMqPollReq)); - return pReq; + pReq->head.vgId = pVg->vgId; } SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { @@ -1559,15 +1552,32 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { #endif } atomic_store_32(&pVg->vgSkipCnt, 0); - SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg); - if (pReq == NULL) { + + SMqPollReq req = {0}; + tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg); + int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); + if (msgSize < 0) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); tsem_post(&tmq->rspSem); return -1; } + char *msg = taosMemoryCalloc(1, msgSize); + if (NULL == msg) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + tsem_post(&tmq->rspSem); + return -1; + } + + if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { + taosMemoryFree(msg); + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + tsem_post(&tmq->rspSem); + return -1; + } + SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); if (pParam == NULL) { - taosMemoryFree(pReq); + taosMemoryFree(msg); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); tsem_post(&tmq->rspSem); return -1; @@ -1581,7 +1591,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { - taosMemoryFree(pReq); + taosMemoryFree(msg); taosMemoryFree(pParam); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); tsem_post(&tmq->rspSem); @@ -1589,11 +1599,11 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } sendInfo->msgInfo = (SDataBuf){ - .pData = pReq, - .len = sizeof(SMqPollReq), + .pData = msg, + .len = msgSize, .handle = NULL, }; - sendInfo->requestId = pReq->reqId; + sendInfo->requestId = req.reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; sendInfo->fp = tmqPollCb; @@ -1605,7 +1615,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { char offsetFormatBuf[80]; tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset); tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64, - tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId); + tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 08123e6f4a..811a492b87 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4766,7 +4766,7 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { if (buf != NULL) { SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen); - pHead->vgId = htonl(pReq->header.vgId); + pHead->vgId = htonl(pReq->head.vgId); pHead->contLen = htonl(tlen + headLen); } @@ -4777,8 +4777,8 @@ int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); SMsgHead *pHead = buf; - pHead->vgId = pReq->header.vgId; - pHead->contLen = pReq->header.contLen; + pHead->vgId = pReq->head.vgId; + pHead->contLen = pReq->head.contLen; SDecoder decoder = {0}; tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 092128fe6e..1c543c434c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -458,20 +458,26 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { } int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { - SMqPollReq* pReq = pMsg->pCont; - int64_t consumerId = pReq->consumerId; - int32_t reqEpoch = pReq->epoch; + SMqPollReq req = {0}; int32_t code = 0; - STqOffsetVal reqOffset = pReq->reqOffset; STqOffsetVal fetchOffsetNew; SWalCkHead* pCkHead = NULL; + if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); + return -1; + } + + int64_t consumerId = req.consumerId; + int32_t reqEpoch = req.epoch; + STqOffsetVal reqOffset = req.reqOffset; + // 1.find handle - STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); /*ASSERT(pHandle);*/ if (pHandle == NULL) { tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, - TD_VID(pTq->pVnode), pReq->subKey); + TD_VID(pTq->pVnode), req.subKey); return -1; } @@ -479,7 +485,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHandle->consumerId != consumerId) { tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64 ", in vgId:%d, subkey %s, handle consumer id %" PRId64, - consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId); + consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; return -1; } @@ -493,13 +499,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { char buf[80]; tFormatOffset(buf, 80, &reqOffset); tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId, - pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); + req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); // 2.reset offset if needed if (reqOffset.type > 0) { fetchOffsetNew = reqOffset; } else { - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey); + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); if (pOffset != NULL) { fetchOffsetNew = pOffset->val; char formatBuf[80]; @@ -508,7 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { TD_VID(pTq->pVnode), formatBuf); } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { - if (pReq->useSnapshot) { + if (req.useSnapshot) { if (pHandle->fetchMeta) { tqOffsetResetToMeta(&fetchOffsetNew, 0); } else { @@ -520,21 +526,21 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { code = -1; } tDeleteSMqDataRsp(&dataRsp); return code; } else { STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, pReq); + tqInitTaosxRsp(&taosxRsp, &req); tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) { + if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { code = -1; } tDeleteSTaosxRsp(&taosxRsp); @@ -543,7 +549,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey %s, no offset committed for consumer %" PRId64 " in vg %d, subkey %s, reset none failed", - pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pReq->subKey); + pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey); terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; return -1; } @@ -552,7 +558,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, pReq, pHandle->execHandle.subType); + tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); // lock taosWLockLatch(&pTq->pushLock); tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); @@ -580,7 +586,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { #endif taosWUnLockLatch(&pTq->pushLock); - if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { + if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { code = -1; } @@ -599,13 +605,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, pReq); + tqInitTaosxRsp(&taosxRsp, &req); if (fetchOffsetNew.type != TMQ_OFFSET__LOG) { tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew); if (metaRsp.metaRspLen > 0) { - if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { + if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) { code = -1; } tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64 @@ -618,7 +624,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (taosxRsp.blockNum > 0) { - if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) { + if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { code = -1; } tDeleteSTaosxRsp(&taosxRsp); @@ -648,13 +654,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (consumerEpoch > reqEpoch) { tqWarn("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, vg %d offset %" PRId64 ", found new consumer epoch %d, discard req epoch %d", - consumerId, pReq->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); + consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch); break; } if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) { + if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { code = -1; } tDeleteSTaosxRsp(&taosxRsp); @@ -665,7 +671,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SWalCont* pHead = &pCkHead->head; tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, - pReq->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); + req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; @@ -675,7 +681,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (taosxRsp.blockNum > 0 /* threshold */) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - if (tqSendTaosxRsp(pTq, pMsg, pReq, &taosxRsp) < 0) { + if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { code = -1; } tDeleteSTaosxRsp(&taosxRsp); @@ -693,7 +699,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) { + if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) { code = -1; taosMemoryFreeClear(pCkHead); tDeleteSTaosxRsp(&taosxRsp);