fix(tmq): fix the error in tmq.
This commit is contained in:
parent
8a9746d2bf
commit
8f1f423c90
|
@ -24,8 +24,6 @@
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
#define VG_POLL_IGNORE_TICK 100
|
|
||||||
|
|
||||||
struct SMqMgmt {
|
struct SMqMgmt {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
tmr_h timer;
|
tmr_h timer;
|
||||||
|
@ -134,7 +132,7 @@ typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t vgStatus;
|
int32_t vgStatus;
|
||||||
int32_t vgSkipCnt;
|
int32_t vgSkipCnt;
|
||||||
int32_t vgIgnoreCnt; // once empty block is received, idle for ignoreCnt then start to poll data
|
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
} SMqClientVg;
|
} SMqClientVg;
|
||||||
|
|
||||||
|
@ -1403,7 +1401,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
.epSet = pVgEp->epSet,
|
.epSet = pVgEp->epSet,
|
||||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||||
.vgSkipCnt = 0,
|
.vgSkipCnt = 0,
|
||||||
.vgIgnoreCnt = 0,
|
.emptyBlockReceiveTs = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
taosArrayPush(pTopic->vgs, &clientVg);
|
taosArrayPush(pTopic->vgs, &clientVg);
|
||||||
|
@ -1704,10 +1702,9 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
|
|
||||||
for (int j = 0; j < numOfVg; j++) {
|
for (int j = 0; j < numOfVg; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if (pVg->vgIgnoreCnt > 0) {
|
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < 100) { // less than 100ms
|
||||||
pVg->vgIgnoreCnt -= 1;
|
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 100ms before start next poll", tmq->consumerId, tmq->epoch,
|
||||||
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %d tick before poll", tmq->consumerId, tmq->epoch,
|
pVg->vgId);
|
||||||
pVg->vgId, pVg->vgIgnoreCnt);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1846,9 +1843,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
rspWrapper = NULL;
|
rspWrapper = NULL;
|
||||||
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
|
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
|
||||||
pollRspWrapper->reqId);
|
pollRspWrapper->reqId);
|
||||||
pVg->vgIgnoreCnt = VG_POLL_IGNORE_TICK;
|
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
continue;
|
continue;
|
||||||
|
} else {
|
||||||
|
pVg->emptyBlockReceiveTs = 0; // reset the ts
|
||||||
}
|
}
|
||||||
|
|
||||||
// build rsp
|
// build rsp
|
||||||
|
|
|
@ -735,9 +735,7 @@ void build_consumer(SThreadInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
|
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
|
||||||
tmq_conf_destroy(conf);
|
tmq_conf_destroy(conf);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue