diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 4cef25b9ce..ec2ebdfae9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1124,10 +1124,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { pRspWrapper->topicHandle = pTopic; if (rspType == TMQ_MSG_TYPE__POLL_RSP) { - memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp); + memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); /*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/ } else { ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP); @@ -1138,7 +1138,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId, - pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset, rspType); + pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, rspType); taosWriteQitem(tmq->mqueue, pRspWrapper); tsem_post(&tmq->rspSem); @@ -1203,17 +1203,17 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { for (int32_t j = 0; j < vgNumGet; j++) { SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); - int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); - int64_t offset = tmq->resetOffsetCfg; + STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); + STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg}; if (pOffset != NULL) { - offset = *pOffset; + offsetNew = *pOffset; } - tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld, vgKey is %s", tmq->consumerId, epoch, - pVgEp->vgId, offset, vgKey); + /*tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld, vgKey is %s", tmq->consumerId, epoch,*/ + /*pVgEp->vgId, offset, vgKey);*/ SMqClientVg clientVg = { .pollCnt = 0, - .currentOffsetNew.type = tmq->resetOffsetCfg, + .currentOffsetNew = offsetNew, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, .vgStatus = TMQ_VG_STATUS__IDLE, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ac36dbf774..aadacf1e72 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -142,7 +142,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); + tEncoderInit(&encoder, abuf, len); tEncodeSMqDataRsp(&encoder, pRsp); /*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/ @@ -332,7 +332,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pHeadWithCkSum) < 0) { // TODO add push mgr - tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer); + tqOffsetResetToLog(&dataRsp.rspOffset, fetchVer - 1); if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; } diff --git a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim index 480cf520d9..d796d00f03 100644 --- a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim +++ b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim @@ -134,6 +134,8 @@ endi $totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb $sumOfMsgCnt = $data[0][2] + $data[1][2] if $sumOfMsgCnt != $totalMsgCons then + print total: $totalMsgCons + print sum: $sumOfMsgCnt return -1 endi