Merge pull request #22173 from taosdata/fix/TS-3728
fix:avoid request offset type is 0
This commit is contained in:
commit
c2588d71c3
|
@ -1859,8 +1859,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
|
||||||
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
|
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
|
||||||
if (!pVg->seekUpdated) {
|
if (!pVg->seekUpdated) {
|
||||||
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
|
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
|
||||||
pVg->offsetInfo.beginOffset = *reqOffset;
|
if(reqOffset->type != 0) pVg->offsetInfo.beginOffset = *reqOffset;
|
||||||
pVg->offsetInfo.endOffset = *rspOffset;
|
if(rspOffset->type != 0) pVg->offsetInfo.endOffset = *rspOffset;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
|
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -7207,11 +7207,6 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
|
||||||
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
|
return pLeft->uid == pRight->uid && pLeft->ts == pRight->ts;
|
||||||
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) {
|
} else if (pLeft->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
return pLeft->uid == pRight->uid;
|
return pLeft->uid == pRight->uid;
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
/*ASSERT(pLeft->type == TMQ_OFFSET__RESET_NONE || pLeft->type == TMQ_OFFSET__RESET_EARLIEST ||*/
|
|
||||||
/*pLeft->type == TMQ_OFFSET__RESET_LATEST);*/
|
|
||||||
/*return true;*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -344,9 +344,11 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
|
||||||
if (blockReturned) {
|
if (blockReturned) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
} else { // use the consumer specified offset
|
} else if(reqOffset.type != 0){ // use the consumer specified offset
|
||||||
// the offset value can not be monotonious increase??
|
// the offset value can not be monotonious increase??
|
||||||
offset = reqOffset;
|
offset = reqOffset;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a normal subscribe requirement
|
// this is a normal subscribe requirement
|
||||||
|
|
Loading…
Reference in New Issue