Merge pull request #23800 from taosdata/fix/TS-4242
fix:[TS-4242] tmq status is always RECOVER
This commit is contained in:
commit
372ac4ebce
|
@ -1216,7 +1216,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
|
while (syncAskEp(tmq) != 0) {
|
||||||
if (retryCnt++ > MAX_RETRY_COUNT) {
|
if (retryCnt++ > MAX_RETRY_COUNT) {
|
||||||
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
|
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
|
||||||
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||||
|
@ -1454,6 +1454,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
||||||
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
|
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
|
||||||
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d",
|
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d",
|
||||||
tmq->consumerId, tmq->epoch, epoch, topicNumGet);
|
tmq->consumerId, tmq->epoch, epoch, topicNumGet);
|
||||||
|
if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
|
||||||
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1965,9 +1968,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
while (1) {
|
||||||
|
if(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__RECOVER){
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tscInfo("consumer:0x%" PRIx64 " tmq status is recover", tmq->consumerId);
|
||||||
|
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) {
|
while (syncAskEp(tmq) != 0) {
|
||||||
if (retryCnt++ > 40) {
|
if (retryCnt++ > 40) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue