fix:[TS-4242] tmq status is always RECOVER

This commit is contained in:
wangmm0220 2023-11-23 20:44:10 +08:00
parent 92240d7f95
commit e1eda752e7
1 changed files with 11 additions and 3 deletions

View File

@ -1216,7 +1216,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
while (syncAskEp(tmq) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
@ -1454,6 +1454,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet);
if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
}
return false;
}
@ -1965,9 +1968,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
return NULL;
}
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
while (1) {
if(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__RECOVER){
break;
}
tscInfo("consumer:0x%" PRIx64 " tmq status is recover", tmq->consumerId);
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
while (syncAskEp(tmq) != 0) {
if (retryCnt++ > 40) {
return NULL;
}