fix:[TS-4716]wait too long if change system time
This commit is contained in:
parent
b299ff0449
commit
b0cdf887b4
|
@ -1805,14 +1805,15 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
}
|
}
|
||||||
for (int j = 0; j < numOfVg; j++) {
|
for (int j = 0; j < numOfVg; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
|
int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs;
|
||||||
|
if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed > 0) { // less than 10ms
|
||||||
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
|
||||||
tmq->epoch, pVg->vgId);
|
tmq->epoch, pVg->vgId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tmq->replayEnable &&
|
elapsed = taosGetTimestampMs() - pVg->blockReceiveTs;
|
||||||
taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms
|
if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed > 0) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
|
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
|
||||||
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
|
||||||
continue;
|
continue;
|
||||||
|
@ -2127,7 +2128,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
if (timeout >= 0) {
|
if (timeout >= 0) {
|
||||||
int64_t currentTime = taosGetTimestampMs();
|
int64_t currentTime = taosGetTimestampMs();
|
||||||
int64_t elapsedTime = currentTime - startTime;
|
int64_t elapsedTime = currentTime - startTime;
|
||||||
if (elapsedTime > timeout) {
|
if (elapsedTime > timeout || elapsedTime < 0) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
||||||
tmq->consumerId, tmq->epoch, startTime, currentTime);
|
tmq->consumerId, tmq->epoch, startTime, currentTime);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -385,7 +385,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
||||||
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||||
SWalReader* pWalReader = pReader->pWalReader;
|
SWalReader* pWalReader = pReader->pWalReader;
|
||||||
|
|
||||||
uint64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||||
while (pReader->nextBlk < numOfBlocks) {
|
while (pReader->nextBlk < numOfBlocks) {
|
||||||
|
@ -413,7 +413,8 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||||
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
||||||
pReader->msg.msgStr = NULL;
|
pReader->msg.msgStr = NULL;
|
||||||
|
|
||||||
if (taosGetTimestampMs() - st > 1000) {
|
int64_t elapsed = taosGetTimestampMs() - st;
|
||||||
|
if(elapsed > 1000 && elapsed < 0){
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue