Merge pull request #21578 from taosdata/fix/xftan/no-data-after-seek
fix: no data after seek
This commit is contained in:
commit
eff162d185
|
@ -1868,7 +1868,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
// update the local offset value only for the returned values, only when the local offset is NOT updated
|
// update the local offset value only for the returned values, only when the local offset is NOT updated
|
||||||
// by tmq_offset_seek function
|
// by tmq_offset_seek function
|
||||||
if (!pVg->seekUpdated) {
|
if (!pVg->seekUpdated) {
|
||||||
|
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
|
||||||
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
|
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
|
||||||
|
} else {
|
||||||
|
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the status
|
// update the status
|
||||||
|
@ -1952,8 +1955,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate
|
// update the local offset value only for the returned values, only when the local offset is NOT updated
|
||||||
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
|
// by tmq_offset_seek function
|
||||||
|
if (!pVg->seekUpdated) {
|
||||||
|
if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate
|
||||||
|
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
|
||||||
|
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
|
|
Loading…
Reference in New Issue