Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0

This commit is contained in:
Hongze Cheng 2024-03-04 10:16:05 +08:00
commit aaa871c70f
1 changed files with 10 additions and 29 deletions

View File

@ -1010,19 +1010,8 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
}
taosSsleep(2); // sleep 2s for hb to send offset and rows to server
int32_t rsp;
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new();
while (1) {
rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
int32_t rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
return rsp;
}
@ -1272,10 +1261,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
int32_t retryCnt = 0;
while (syncAskEp(tmq) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT) {
while ((code = syncAskEp(tmq)) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto FAIL;
}
@ -2148,26 +2136,19 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
// if auto commit is set, commit before close consumer. Otherwise, do nothing.
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
return rsp;
int32_t code = tmq_commit_sync(tmq, NULL);
if (code != 0) {
return code;
}
}
taosSsleep(2); // sleep 2s for hb to send offset and rows to server
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new();
while (1) {
int32_t rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
int32_t code = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
if (code != 0) {
return code;
}
} else {
tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
}