fix:[TD-28869]get error in askEp because consumer is dropped when unsubscribe topic

This commit is contained in:
wangmm0220 2024-02-28 17:05:11 +08:00
parent 09a04ed0a4
commit eec6b66877
1 changed files with 4 additions and 26 deletions

View File

@ -1009,19 +1009,8 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
} }
taosSsleep(2); // sleep 2s for hb to send offset and rows to server taosSsleep(2); // sleep 2s for hb to send offset and rows to server
int32_t rsp;
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
while (1) { int32_t rsp = tmq_subscribe(tmq, lst);
rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
tmq_list_destroy(lst); tmq_list_destroy(lst);
return rsp; return rsp;
} }
@ -1271,10 +1260,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
} }
int32_t retryCnt = 0; int32_t retryCnt = 0;
while (syncAskEp(tmq) != 0) { while ((code = syncAskEp(tmq)) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT) { if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId); tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto FAIL; goto FAIL;
} }
@ -2154,18 +2142,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
} }
taosSsleep(2); // sleep 2s for hb to send offset and rows to server taosSsleep(2); // sleep 2s for hb to send offset and rows to server
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
while (1) { tmq_subscribe(tmq, lst);
int32_t rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
tmq_list_destroy(lst); tmq_list_destroy(lst);
} else { } else {
tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId); tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);