commit
5d8f6a0354
|
@ -1044,7 +1044,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
|||
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
||||
if (epStatus == 1) {
|
||||
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
|
||||
tscDebug("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
|
||||
tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
|
||||
if (epSkipCnt < 5000) return 0;
|
||||
}
|
||||
atomic_store_32(&tmq->epSkipCnt, 0);
|
||||
|
@ -1235,7 +1235,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
|||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||
if (vgStatus != TMQ_VG_STATUS__IDLE) {
|
||||
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
|
||||
tscDebug("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
|
||||
tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
|
||||
continue;
|
||||
/*if (vgSkipCnt < 10000) continue;*/
|
||||
#if 0
|
||||
|
|
|
@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
vDebug("poll topic %s from consumer %ld (epoch %d) %s", pTopic->topicName, consumerId, pReq->epoch, pTopic->topicName);
|
||||
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, pTq->pVnode->vgId);
|
||||
|
||||
rsp.reqOffset = pReq->currentOffset;
|
||||
rsp.skipLogNum = 0;
|
||||
|
@ -325,8 +325,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
|
||||
// TODO
|
||||
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
||||
if (consumerEpoch > pReq->epoch) {
|
||||
// TODO: return
|
||||
if (consumerEpoch > reqEpoch) {
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
|
||||
break;
|
||||
}
|
||||
SWalReadHead* pHead;
|
||||
|
|
|
@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) {
|
|||
int32_t totalRows = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
while (running) {
|
||||
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 4000);
|
||||
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 8000);
|
||||
if (tmqMsg) {
|
||||
totalMsgs++;
|
||||
|
||||
|
|
Loading…
Reference in New Issue