From 6a6930cb3153d2c316f0bd6c7162608152d5d80f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 23 Nov 2023 16:25:11 +0800 Subject: [PATCH] fix:[TS-4242] tmq status is always RECOVER --- source/client/src/clientTmq.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 6aebba7c02..d05cdc0156 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1216,7 +1216,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } int32_t retryCnt = 0; - while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) { + while (syncAskEp(tmq) != 0) { if (retryCnt++ > MAX_RETRY_COUNT) { tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId); code = TSDB_CODE_MND_CONSUMER_NOT_READY; @@ -1454,6 +1454,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) { tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId, tmq->epoch, epoch, topicNumGet); + if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){ + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); + } return false; } @@ -1965,9 +1968,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { return NULL; } - while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { + while (1) { + if(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__RECOVER){ + break; + } + tscInfo("consumer:0x%" PRIx64 " tmq status is recover", tmq->consumerId); + int32_t retryCnt = 0; - while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) { + while (syncAskEp(tmq) != 0) { if (retryCnt++ > 40) { return NULL; }