diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fa2e250b2b..b2c901c800 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1859,8 +1859,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){ if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); - pVg->offsetInfo.beginOffset = *reqOffset; - pVg->offsetInfo.endOffset = *rspOffset; + if(reqOffset->type != 0) pVg->offsetInfo.beginOffset = *reqOffset; + if(rspOffset->type != 0) pVg->offsetInfo.endOffset = *rspOffset; } else { tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index fc99202bce..8051f4d0bd 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7207,11 +7207,6 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts; } else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) { return pLeft->uid == pRight->uid; - } else { - ASSERT(0); - /*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEST ||*/ - /*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/ - /*return true;*/ } } return false; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 7d632f44bc..4320995306 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -344,9 +344,11 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ if (blockReturned) { return 0; } - } else { // use the consumer specified offset + } else if(reqOffset.type != 0){ // use the consumer specified offset // the offset value can not be monotonious increase?? offset = reqOffset; + } else { + return TSDB_CODE_TMQ_INVALID_MSG; } // this is a normal subscribe requirement