From 9e71d03d37dc1a3de0bcd8061bfb6bf7d5e6f705 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 18 May 2022 15:31:05 +0800 Subject: [PATCH] enh(tmq): commit when consumer_close called --- source/client/src/tmq.c | 39 +++++++++++++++++------ source/dnode/mnode/impl/src/mndConsumer.c | 27 +++++++++------- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index ecfc991331..f7ddca8f69 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -119,7 +119,7 @@ enum { enum { TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__READY, - TMQ_CONSUMER_STATUS__NO_TOPIC, + /*TMQ_CONSUMER_STATUS__NO_TOPIC,*/ }; enum { @@ -753,12 +753,19 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { + if (topic_list == NULL) { + return TMQ_RESP_ERR__FAIL; + } const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); void* buf = NULL; SCMSubscribeReq req = {0}; int32_t code = -1; + if (sz == 0) { + return TMQ_RESP_ERR__FAIL; + } + req.consumerId = tmq->consumerId; tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); @@ -830,10 +837,12 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } // init hb timer - tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer); + if (tmq->hbTimer == NULL) { + tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer); + } // init auto commit timer - if (tmq->autoCommit) { + if (tmq->autoCommit && tmq->commitTimer == NULL) { tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer); } @@ -1074,10 +1083,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { taosHashCleanup(pHash); tmq->clientTopics = newTopics; - if (taosArrayGetSize(tmq->clientTopics) == 0) - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); - else - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); + ASSERT(taosArrayGetSize(tmq->clientTopics) != 0); + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); + + /*if (taosArrayGetSize(tmq->clientTopics) == 0)*/ + /*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);*/ + /*else*/ + /*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);*/ atomic_store_32(&tmq->epoch, epoch); return set; @@ -1456,9 +1468,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { if (tmq->status == TMQ_CONSUMER_STATUS__READY) { - tmq_list_t* lst = tmq_list_new(); - tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL); + if (rsp == TMQ_RESP_ERR__SUCCESS) { + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; + } else { + return TMQ_RESP_ERR__FAIL; + } + + tmq_list_t* lst = tmq_list_new(); + rsp = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); + if (rsp == TMQ_RESP_ERR__SUCCESS) { // TODO: free resources return TMQ_RESP_ERR__SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 503c0f404a..274876567e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -627,21 +627,26 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0); - SArray *tmp = pOldConsumer->rebNewTopics; - pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; - pNewConsumer->rebNewTopics = tmp; - tmp = pOldConsumer->rebRemovedTopics; - pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; - pNewConsumer->rebRemovedTopics = tmp; + if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) { + pOldConsumer->status = MQ_CONSUMER_STATUS__READY; + } else { + SArray *tmp = pOldConsumer->rebNewTopics; + pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; + pNewConsumer->rebNewTopics = tmp; - tmp = pOldConsumer->assignedTopics; - pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; - pNewConsumer->assignedTopics = tmp; + tmp = pOldConsumer->rebRemovedTopics; + pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; + pNewConsumer->rebRemovedTopics = tmp; - pOldConsumer->subscribeTime = pNewConsumer->upTime; + tmp = pOldConsumer->assignedTopics; + pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; + pNewConsumer->assignedTopics = tmp; - pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + pOldConsumer->subscribeTime = pNewConsumer->upTime; + + pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + } } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);