enh(tmq): commit when consumer_close called

This commit is contained in:
Liu Jicong 2022-05-18 15:31:05 +08:00
parent 8a2772f8a7
commit 9e71d03d37
2 changed files with 46 additions and 20 deletions

View File

@ -119,7 +119,7 @@ enum {
enum {
TMQ_CONSUMER_STATUS__INIT = 0,
TMQ_CONSUMER_STATUS__READY,
TMQ_CONSUMER_STATUS__NO_TOPIC,
/*TMQ_CONSUMER_STATUS__NO_TOPIC,*/
};
enum {
@ -753,12 +753,19 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
}
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if (topic_list == NULL) {
return TMQ_RESP_ERR__FAIL;
}
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
void* buf = NULL;
SCMSubscribeReq req = {0};
int32_t code = -1;
if (sz == 0) {
return TMQ_RESP_ERR__FAIL;
}
req.consumerId = tmq->consumerId;
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*));
@ -830,10 +837,12 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
// init hb timer
tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
if (tmq->hbTimer == NULL) {
tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
}
// init auto commit timer
if (tmq->autoCommit) {
if (tmq->autoCommit && tmq->commitTimer == NULL) {
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
}
@ -1074,10 +1083,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
taosHashCleanup(pHash);
tmq->clientTopics = newTopics;
if (taosArrayGetSize(tmq->clientTopics) == 0)
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
else
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
ASSERT(taosArrayGetSize(tmq->clientTopics) != 0);
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
/*if (taosArrayGetSize(tmq->clientTopics) == 0)*/
/*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);*/
/*else*/
/*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);*/
atomic_store_32(&tmq->epoch, epoch);
return set;
@ -1456,9 +1468,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
tmq_list_t* lst = tmq_list_new();
tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;
} else {
return TMQ_RESP_ERR__FAIL;
}
tmq_list_t* lst = tmq_list_new();
rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
if (rsp == TMQ_RESP_ERR__SUCCESS) {
// TODO: free resources
return TMQ_RESP_ERR__SUCCESS;

View File

@ -627,21 +627,26 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) {
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);
SArray *tmp = pOldConsumer->rebNewTopics;
pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
pNewConsumer->rebNewTopics = tmp;
tmp = pOldConsumer->rebRemovedTopics;
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
pNewConsumer->rebRemovedTopics = tmp;
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else {
SArray *tmp = pOldConsumer->rebNewTopics;
pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics;
pNewConsumer->rebNewTopics = tmp;
tmp = pOldConsumer->assignedTopics;
pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
pNewConsumer->assignedTopics = tmp;
tmp = pOldConsumer->rebRemovedTopics;
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
pNewConsumer->rebRemovedTopics = tmp;
pOldConsumer->subscribeTime = pNewConsumer->upTime;
tmp = pOldConsumer->assignedTopics;
pOldConsumer->assignedTopics = pNewConsumer->assignedTopics;
pNewConsumer->assignedTopics = tmp;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
pOldConsumer->subscribeTime = pNewConsumer->upTime;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
}
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);
ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);