fix(tmq): diable commit when close conumer if auto commit is disabled.

This commit is contained in:
Haojun Liao 2023-03-20 18:34:01 +08:00
parent 2517f9bb83
commit e31796893d
1 changed files with 29 additions and 21 deletions

View File

@ -2017,37 +2017,43 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
} }
} }
static void displayConsumeStatistics(const tmq_t* pTmq) {
int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);
tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
for (int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);
tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
for (int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
}
}
tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
int32_t tmq_consumer_close(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) {
tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status); tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status);
displayConsumeStatistics(tmq);
if (tmq->status == TMQ_CONSUMER_STATUS__READY) { if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
int32_t rsp = tmq_commit_sync(tmq, NULL); // if auto commit is set, commit before close consumer. Otherwise, do nothing.
if (rsp != 0) { if (tmq->autoCommit) {
return rsp; int32_t rsp = tmq_commit_sync(tmq, NULL);
} if (rsp != 0) {
return rsp;
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch);
tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId);
for (int32_t i = 0; i < numOfTopics; ++i) {
SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i);
tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i);
int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
for (int32_t j = 0; j < numOfVgs; ++j) {
SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
} }
} }
tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId);
int32_t retryCnt = 0; int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
while (1) { while (1) {
rsp = tmq_subscribe(tmq, lst); int32_t rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break; break;
} else { } else {
@ -2057,6 +2063,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
} }
tmq_list_destroy(lst); tmq_list_destroy(lst);
} else {
tscWarn("consumer:0x%" PRIx64" not in ready state, close it directly", tmq->consumerId);
} }
taosRemoveRef(tmqMgmt.rsetId, tmq->refId); taosRemoveRef(tmqMgmt.rsetId, tmq->refId);