diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 79139a5fe5..af819bf20f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2017,37 +2017,43 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } +static void displayConsumeStatistics(const tmq_t* pTmq) { + int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); + tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", + pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch); + + tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId); + for (int32_t i = 0; i < numOfTopics; ++i) { + SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i); + + tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i); + int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); + for (int32_t j = 0; j < numOfVgs; ++j) { + SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); + tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); + } + } + + tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); +} + int32_t tmq_consumer_close(tmq_t* tmq) { tscDebug("consumer:0x%" PRIx64" start to close consumer, status:%d", tmq->consumerId, tmq->status); + displayConsumeStatistics(tmq); if (tmq->status == TMQ_CONSUMER_STATUS__READY) { - int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0) { - return rsp; - } - - int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", - tmq->consumerId, tmq->pollCnt, tmq->totalRows, numOfTopics, tmq->epoch); - - tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", tmq->consumerId); - for (int32_t i = 0; i < numOfTopics; ++i) { - SMqClientTopic* pTopics = taosArrayGet(tmq->clientTopics, i); - - tscDebug("consumer:0x%" PRIx64 " topic:%d", tmq->consumerId, i); - int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); - for (int32_t j = 0; j < numOfVgs; ++j) { - SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); - tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); + // if auto commit is set, commit before close consumer. Otherwise, do nothing. + if (tmq->autoCommit) { + int32_t rsp = tmq_commit_sync(tmq, NULL); + if (rsp != 0) { + return rsp; } } - tscDebug("consumer:0x%" PRIx64 " rows dist end", tmq->consumerId); - int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); while (1) { - rsp = tmq_subscribe(tmq, lst); + int32_t rsp = tmq_subscribe(tmq, lst); if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { break; } else { @@ -2057,6 +2063,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) { } tmq_list_destroy(lst); + } else { + tscWarn("consumer:0x%" PRIx64" not in ready state, close it directly", tmq->consumerId); } taosRemoveRef(tmqMgmt.rsetId, tmq->refId);