fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585
This commit is contained in:
parent
ba469cc967
commit
19f059212d
|
@ -74,8 +74,9 @@ enum {
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tmr_h timer;
|
tmr_h timer;
|
||||||
int32_t rsetId;
|
int32_t rsetId;
|
||||||
|
TdThreadMutex lock;
|
||||||
} SMqMgmt;
|
} SMqMgmt;
|
||||||
|
|
||||||
struct tmq_list_t {
|
struct tmq_list_t {
|
||||||
|
@ -1603,13 +1604,33 @@ static void tmqMgmtInit(void) {
|
||||||
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
|
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
|
||||||
|
|
||||||
if (tmqMgmt.timer == NULL) {
|
if (tmqMgmt.timer == NULL) {
|
||||||
tmqInitRes = terrno;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
|
tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
|
||||||
if (tmqMgmt.rsetId < 0) {
|
if (tmqMgmt.rsetId < 0) {
|
||||||
tmqInitRes = terrno;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TdThreadMutexAttr attr = {0};
|
||||||
|
if (taosThreadMutexAttrInit(&attr) != 0){
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE) != 0){
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosThreadMutexInit(&tmqMgmt.lock, &attr) != 0){
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosThreadMutexAttrDestroy(&attr) != 0){
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
|
END:
|
||||||
|
tmqInitRes = terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmqMgmtClose(void) {
|
void tmqMgmtClose(void) {
|
||||||
|
@ -1618,6 +1639,7 @@ void tmqMgmtClose(void) {
|
||||||
tmqMgmt.timer = NULL;
|
tmqMgmt.timer = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(void) taosThreadMutexLock(&tmqMgmt.lock);
|
||||||
if (tmqMgmt.rsetId >= 0) {
|
if (tmqMgmt.rsetId >= 0) {
|
||||||
tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
|
tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
|
||||||
int64_t refId = 0;
|
int64_t refId = 0;
|
||||||
|
@ -1627,9 +1649,7 @@ void tmqMgmtClose(void) {
|
||||||
if (refId == 0) {
|
if (refId == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
taosWLockLatch(&tmq->lock);
|
|
||||||
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
|
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
|
||||||
taosWUnLockLatch(&tmq->lock);
|
|
||||||
|
|
||||||
if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) {
|
if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) {
|
||||||
qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno));
|
qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno));
|
||||||
|
@ -1640,6 +1660,8 @@ void tmqMgmtClose(void) {
|
||||||
taosCloseRef(tmqMgmt.rsetId);
|
taosCloseRef(tmqMgmt.rsetId);
|
||||||
tmqMgmt.rsetId = -1;
|
tmqMgmt.rsetId = -1;
|
||||||
}
|
}
|
||||||
|
(void)taosThreadMutexUnlock(&tmqMgmt.lock);
|
||||||
|
(void)taosThreadMutexDestroy(&tmqMgmt.lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
|
@ -2636,7 +2658,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
|
||||||
int32_t tmq_consumer_close(tmq_t* tmq) {
|
int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
|
if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
taosWLockLatch(&tmq->lock);
|
(void) taosThreadMutexLock(&tmqMgmt.lock);
|
||||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
|
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -2651,7 +2673,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
taosWUnLockLatch(&tmq->lock);
|
(void)taosThreadMutexLock(&tmqMgmt.lock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue