diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0a31eac09c..b5ae9116ef 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1830,7 +1830,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); @@ -1848,7 +1850,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->taosxRsp.blockNum == 0) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 1f06132d2f..1b5e498a7f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -211,6 +211,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); + qTaskInfo_t task = pHandle->execHandle.task; + if(qTaskIsExecuting(task)){ + code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {