From ba469cc9672c77b39a1a7e28cd1344fa575569c3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 20 Jan 2025 19:28:03 +0800 Subject: [PATCH 1/6] fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585 --- source/client/src/clientMain.c | 2 +- source/client/src/clientTmq.c | 28 +++++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 83aff351dd..190a724151 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -253,7 +253,7 @@ void taos_cleanup(void) { taosCloseRef(id); nodesDestroyAllocatorSet(); - // cleanupAppInfo(); + cleanupAppInfo(); rpcCleanup(); tscDebug("rpc cleanup"); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 78bceb4ef8..be3bcf25b3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1619,6 +1619,24 @@ void tmqMgmtClose(void) { } if (tmqMgmt.rsetId >= 0) { + tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0); + int64_t refId = 0; + + while (tmq) { + refId = tmq->refId; + if (refId == 0) { + break; + } + taosWLockLatch(&tmq->lock); + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); + taosWUnLockLatch(&tmq->lock); + + if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) { + qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno)); + } + + tmq = taosIterateRef(tmqMgmt.rsetId, refId); + } taosCloseRef(tmqMgmt.rsetId); tmqMgmt.rsetId = -1; } @@ -2617,8 +2635,13 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; + int32_t code = 0; + taosWLockLatch(&tmq->lock); + if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){ + goto end; + } tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); - int32_t code = tmq_unsubscribe(tmq); + code = tmq_unsubscribe(tmq); if (code == 0) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId); @@ -2626,6 +2649,9 @@ int32_t tmq_consumer_close(tmq_t* tmq) { tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code); } } + + end: + taosWUnLockLatch(&tmq->lock); return code; } From 19f059212dd4f0aed8c7adee54b54cc4d5813438 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 21 Jan 2025 11:15:26 +0800 Subject: [PATCH 2/6] fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585 --- source/client/src/clientTmq.c | 38 +++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index be3bcf25b3..fb1052b7a8 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -74,8 +74,9 @@ enum { }; typedef struct { - tmr_h timer; - int32_t rsetId; + tmr_h timer; + int32_t rsetId; + TdThreadMutex lock; } SMqMgmt; struct tmq_list_t { @@ -1603,13 +1604,33 @@ static void tmqMgmtInit(void) { tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ"); if (tmqMgmt.timer == NULL) { - tmqInitRes = terrno; + goto END; } tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl); if (tmqMgmt.rsetId < 0) { - tmqInitRes = terrno; + goto END; } + + TdThreadMutexAttr attr = {0}; + if (taosThreadMutexAttrInit(&attr) != 0){ + goto END; + } + + if (taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE) != 0){ + goto END; + } + + if (taosThreadMutexInit(&tmqMgmt.lock, &attr) != 0){ + goto END; + } + + if (taosThreadMutexAttrDestroy(&attr) != 0){ + goto END; + } + +END: + tmqInitRes = terrno; } void tmqMgmtClose(void) { @@ -1618,6 +1639,7 @@ void tmqMgmtClose(void) { tmqMgmt.timer = NULL; } + (void) taosThreadMutexLock(&tmqMgmt.lock); if (tmqMgmt.rsetId >= 0) { tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0); int64_t refId = 0; @@ -1627,9 +1649,7 @@ void tmqMgmtClose(void) { if (refId == 0) { break; } - taosWLockLatch(&tmq->lock); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); - taosWUnLockLatch(&tmq->lock); if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) { qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno)); @@ -1640,6 +1660,8 @@ void tmqMgmtClose(void) { taosCloseRef(tmqMgmt.rsetId); tmqMgmt.rsetId = -1; } + (void)taosThreadMutexUnlock(&tmqMgmt.lock); + (void)taosThreadMutexDestroy(&tmqMgmt.lock); } tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { @@ -2636,7 +2658,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; int32_t code = 0; - taosWLockLatch(&tmq->lock); + (void) taosThreadMutexLock(&tmqMgmt.lock); if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){ goto end; } @@ -2651,7 +2673,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { } end: - taosWUnLockLatch(&tmq->lock); + (void)taosThreadMutexLock(&tmqMgmt.lock); return code; } From 650aca8e32203a77b8e69c8b792af9939bd88f1b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 21 Jan 2025 11:39:10 +0800 Subject: [PATCH 3/6] fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585 --- source/client/src/clientTmq.c | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index fb1052b7a8..4662c31fb1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1617,10 +1617,6 @@ static void tmqMgmtInit(void) { goto END; } - if (taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE) != 0){ - goto END; - } - if (taosThreadMutexInit(&tmqMgmt.lock, &attr) != 0){ goto END; } @@ -2658,7 +2654,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; int32_t code = 0; - (void) taosThreadMutexLock(&tmqMgmt.lock); + code = taosThreadMutexLock(&tmqMgmt.lock); if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){ goto end; } @@ -2673,7 +2669,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { } end: - (void)taosThreadMutexLock(&tmqMgmt.lock); + code = taosThreadMutexUnlock(&tmqMgmt.lock); return code; } From b763ed1e49d30a4ccc982353115258387f301459 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 21 Jan 2025 11:43:28 +0800 Subject: [PATCH 4/6] fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585 --- source/client/src/clientTmq.c | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4662c31fb1..a2c8bfa351 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1612,16 +1612,7 @@ static void tmqMgmtInit(void) { goto END; } - TdThreadMutexAttr attr = {0}; - if (taosThreadMutexAttrInit(&attr) != 0){ - goto END; - } - - if (taosThreadMutexInit(&tmqMgmt.lock, &attr) != 0){ - goto END; - } - - if (taosThreadMutexAttrDestroy(&attr) != 0){ + if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){ goto END; } From 6b7df77061c39458065b5924e49f09e5aea39570 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 21 Jan 2025 11:44:13 +0800 Subject: [PATCH 5/6] fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585 --- source/client/src/clientTmq.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a2c8bfa351..603f8368aa 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1616,6 +1616,7 @@ static void tmqMgmtInit(void) { goto END; } + return; END: tmqInitRes = terrno; } From 94f6de8adcdb1033eb3998f1b742932968daadd1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 22 Jan 2025 11:01:41 +0800 Subject: [PATCH 6/6] fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585 --- source/client/src/clientTmq.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 603f8368aa..93117c934f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1649,7 +1649,6 @@ void tmqMgmtClose(void) { tmqMgmt.rsetId = -1; } (void)taosThreadMutexUnlock(&tmqMgmt.lock); - (void)taosThreadMutexDestroy(&tmqMgmt.lock); } tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {