diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fad542ce0b..8758cec2ec 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -99,7 +99,7 @@ struct tmq_t { // poll info int64_t pollCnt; int64_t totalRows; - bool needReportOffsetRows; +// bool needReportOffsetRows; // timer tmr_h hbLiveTimer; @@ -797,7 +797,7 @@ void tmqSendHbReq(void* param, void* tmrId) { SMqHbReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; - if(tmq->needReportOffsetRows){ +// if(tmq->needReportOffsetRows){ req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows)); for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){ SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); @@ -816,8 +816,8 @@ void tmqSendHbReq(void* param, void* tmrId) { tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows); } } - tmq->needReportOffsetRows = false; - } +// tmq->needReportOffsetRows = false; +// } int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); if (tlen < 0) { @@ -1094,7 +1094,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; - pTmq->needReportOffsetRows = true; +// pTmq->needReportOffsetRows = true; // set conf strcpy(pTmq->clientId, conf->clientId); @@ -2452,7 +2452,7 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { // if no more waiting rsp pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam); taosMemoryFree(pParamSet); - tmq->needReportOffsetRows = true; +// tmq->needReportOffsetRows = true; taosReleaseRef(tmqMgmt.rsetId, refId); return 0;