fix:[TD-28869]get error in askEp because consumer is dropped when unsubscribe
This commit is contained in:
parent
eec6b66877
commit
e17832e27e
|
@ -2135,16 +2135,19 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
|
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
|
||||||
// if auto commit is set, commit before close consumer. Otherwise, do nothing.
|
// if auto commit is set, commit before close consumer. Otherwise, do nothing.
|
||||||
if (tmq->autoCommit) {
|
if (tmq->autoCommit) {
|
||||||
int32_t rsp = tmq_commit_sync(tmq, NULL);
|
int32_t code = tmq_commit_sync(tmq, NULL);
|
||||||
if (rsp != 0) {
|
if (code != 0) {
|
||||||
return rsp;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosSsleep(2); // sleep 2s for hb to send offset and rows to server
|
taosSsleep(2); // sleep 2s for hb to send offset and rows to server
|
||||||
|
|
||||||
tmq_list_t* lst = tmq_list_new();
|
tmq_list_t* lst = tmq_list_new();
|
||||||
tmq_subscribe(tmq, lst);
|
int32_t code = tmq_subscribe(tmq, lst);
|
||||||
tmq_list_destroy(lst);
|
tmq_list_destroy(lst);
|
||||||
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
|
tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue