fix: no data after seek

This commit is contained in:
t_max 2023-06-02 17:09:03 +08:00
parent 6a9c03309e
commit 27d8870e18
1 changed files with 12 additions and 2 deletions

View File

@ -1868,7 +1868,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// update the local offset value only for the returned values, only when the local offset is NOT updated // update the local offset value only for the returned values, only when the local offset is NOT updated
// by tmq_offset_seek function // by tmq_offset_seek function
if (!pVg->seekUpdated) { if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
} }
// update the status // update the status
@ -1952,9 +1955,16 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate // update the local offset value only for the returned values, only when the local offset is NOT updated
// by tmq_offset_seek function
if (!pVg->seekUpdated) {
if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
} }
} else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
}
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);