fix:[TS-4716]wait too long if change system time
This commit is contained in:
parent
3c0dbacf37
commit
07a0d21ff0
|
@ -1806,14 +1806,14 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
for (int j = 0; j < numOfVg; j++) {
|
for (int j = 0; j < numOfVg; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
|
int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
|
||||||
if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed > 0) { // less than 10ms
|
if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms
|
||||||
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
|
||||||
tmq->epoch, pVg->vgId);
|
tmq->epoch, pVg->vgId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
|
elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
|
||||||
if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed > 0) {
|
if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
|
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
|
||||||
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
||||||
continue;
|
continue;
|
||||||
|
|
Loading…
Reference in New Issue