From eec6b668772e32983d4b54e32c3c51400a2514cb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 28 Feb 2024 17:05:11 +0800 Subject: [PATCH 1/2] fix:[TD-28869]get error in askEp because consumer is dropped when unsubscribe topic --- source/client/src/clientTmq.c | 30 ++++-------------------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9b74456da2..3ce4e97e22 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1009,19 +1009,8 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { } taosSsleep(2); // sleep 2s for hb to send offset and rows to server - int32_t rsp; - int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); - while (1) { - rsp = tmq_subscribe(tmq, lst); - if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { - break; - } else { - retryCnt++; - taosMsleep(500); - } - } - + int32_t rsp = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); return rsp; } @@ -1271,10 +1260,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } int32_t retryCnt = 0; - while (syncAskEp(tmq) != 0) { - if (retryCnt++ > MAX_RETRY_COUNT) { + while ((code = syncAskEp(tmq)) != 0) { + if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId); - code = TSDB_CODE_MND_CONSUMER_NOT_READY; goto FAIL; } @@ -2154,18 +2142,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) { } taosSsleep(2); // sleep 2s for hb to send offset and rows to server - int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); - while (1) { - int32_t rsp = tmq_subscribe(tmq, lst); - if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { - break; - } else { - retryCnt++; - taosMsleep(500); - } - } - + tmq_subscribe(tmq, lst); tmq_list_destroy(lst); } else { tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId); From e17832e27eb5cdb0860504111fba46d4ba28e6d7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 29 Feb 2024 14:38:09 +0800 Subject: [PATCH 2/2] fix:[TD-28869]get error in askEp because consumer is dropped when unsubscribe --- source/client/src/clientTmq.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3ce4e97e22..11e88dc306 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2135,16 +2135,19 @@ int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq->status == TMQ_CONSUMER_STATUS__READY) { // if auto commit is set, commit before close consumer. Otherwise, do nothing. if (tmq->autoCommit) { - int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0) { - return rsp; + int32_t code = tmq_commit_sync(tmq, NULL); + if (code != 0) { + return code; } } taosSsleep(2); // sleep 2s for hb to send offset and rows to server tmq_list_t* lst = tmq_list_new(); - tmq_subscribe(tmq, lst); + int32_t code = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); + if (code != 0) { + return code; + } } else { tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId); }