fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585
This commit is contained in:
parent
3f17782cfc
commit
ba469cc967
|
@ -253,7 +253,7 @@ void taos_cleanup(void) {
|
||||||
taosCloseRef(id);
|
taosCloseRef(id);
|
||||||
|
|
||||||
nodesDestroyAllocatorSet();
|
nodesDestroyAllocatorSet();
|
||||||
// cleanupAppInfo();
|
cleanupAppInfo();
|
||||||
rpcCleanup();
|
rpcCleanup();
|
||||||
tscDebug("rpc cleanup");
|
tscDebug("rpc cleanup");
|
||||||
|
|
||||||
|
|
|
@ -1619,6 +1619,24 @@ void tmqMgmtClose(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tmqMgmt.rsetId >= 0) {
|
if (tmqMgmt.rsetId >= 0) {
|
||||||
|
tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
|
||||||
|
int64_t refId = 0;
|
||||||
|
|
||||||
|
while (tmq) {
|
||||||
|
refId = tmq->refId;
|
||||||
|
if (refId == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
taosWLockLatch(&tmq->lock);
|
||||||
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
|
||||||
|
taosWUnLockLatch(&tmq->lock);
|
||||||
|
|
||||||
|
if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) {
|
||||||
|
qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
|
||||||
|
tmq = taosIterateRef(tmqMgmt.rsetId, refId);
|
||||||
|
}
|
||||||
taosCloseRef(tmqMgmt.rsetId);
|
taosCloseRef(tmqMgmt.rsetId);
|
||||||
tmqMgmt.rsetId = -1;
|
tmqMgmt.rsetId = -1;
|
||||||
}
|
}
|
||||||
|
@ -2617,8 +2635,13 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
|
||||||
|
|
||||||
int32_t tmq_consumer_close(tmq_t* tmq) {
|
int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
|
if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
|
||||||
|
int32_t code = 0;
|
||||||
|
taosWLockLatch(&tmq->lock);
|
||||||
|
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
|
tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
|
||||||
int32_t code = tmq_unsubscribe(tmq);
|
code = tmq_unsubscribe(tmq);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
|
||||||
code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
|
code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
|
||||||
|
@ -2626,6 +2649,9 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
|
tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
end:
|
||||||
|
taosWUnLockLatch(&tmq->lock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue