Merge pull request #29612 from taosdata/fix/TD-33556
fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585
This commit is contained in:
commit
379071fef0
|
@ -253,7 +253,7 @@ void taos_cleanup(void) {
|
|||
taosCloseRef(id);
|
||||
|
||||
nodesDestroyAllocatorSet();
|
||||
// cleanupAppInfo();
|
||||
cleanupAppInfo();
|
||||
rpcCleanup();
|
||||
tscDebug("rpc cleanup");
|
||||
|
||||
|
|
|
@ -74,8 +74,9 @@ enum {
|
|||
};
|
||||
|
||||
typedef struct {
|
||||
tmr_h timer;
|
||||
int32_t rsetId;
|
||||
tmr_h timer;
|
||||
int32_t rsetId;
|
||||
TdThreadMutex lock;
|
||||
} SMqMgmt;
|
||||
|
||||
struct tmq_list_t {
|
||||
|
@ -1603,13 +1604,21 @@ static void tmqMgmtInit(void) {
|
|||
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
|
||||
|
||||
if (tmqMgmt.timer == NULL) {
|
||||
tmqInitRes = terrno;
|
||||
goto END;
|
||||
}
|
||||
|
||||
tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
|
||||
if (tmqMgmt.rsetId < 0) {
|
||||
tmqInitRes = terrno;
|
||||
goto END;
|
||||
}
|
||||
|
||||
if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){
|
||||
goto END;
|
||||
}
|
||||
return;
|
||||
|
||||
END:
|
||||
tmqInitRes = terrno;
|
||||
}
|
||||
|
||||
void tmqMgmtClose(void) {
|
||||
|
@ -1618,10 +1627,28 @@ void tmqMgmtClose(void) {
|
|||
tmqMgmt.timer = NULL;
|
||||
}
|
||||
|
||||
(void) taosThreadMutexLock(&tmqMgmt.lock);
|
||||
if (tmqMgmt.rsetId >= 0) {
|
||||
tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
|
||||
int64_t refId = 0;
|
||||
|
||||
while (tmq) {
|
||||
refId = tmq->refId;
|
||||
if (refId == 0) {
|
||||
break;
|
||||
}
|
||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
|
||||
|
||||
if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) {
|
||||
qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno));
|
||||
}
|
||||
|
||||
tmq = taosIterateRef(tmqMgmt.rsetId, refId);
|
||||
}
|
||||
taosCloseRef(tmqMgmt.rsetId);
|
||||
tmqMgmt.rsetId = -1;
|
||||
}
|
||||
(void)taosThreadMutexUnlock(&tmqMgmt.lock);
|
||||
}
|
||||
|
||||
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||
|
@ -2617,8 +2644,13 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
|
|||
|
||||
int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||
if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
|
||||
int32_t code = 0;
|
||||
(void) taosThreadMutexLock(&tmqMgmt.lock);
|
||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
|
||||
goto end;
|
||||
}
|
||||
tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
|
||||
int32_t code = tmq_unsubscribe(tmq);
|
||||
code = tmq_unsubscribe(tmq);
|
||||
if (code == 0) {
|
||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
|
||||
code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
|
||||
|
@ -2626,6 +2658,9 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
|
|||
tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
|
||||
}
|
||||
}
|
||||
|
||||
end:
|
||||
(void)taosThreadMutexUnlock(&tmqMgmt.lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue