Merge pull request #29613 from taosdata/fix/TD-33556-main

fix:[TD-33556] tmq close elegantly to avoid invalid read in TD-32585
This commit is contained in:
Shengliang Guan 2025-01-22 13:37:36 +08:00 committed by GitHub
commit 8e0dcd07c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 41 additions and 6 deletions

View File

@ -253,7 +253,7 @@ void taos_cleanup(void) {
taosCloseRef(id); taosCloseRef(id);
nodesDestroyAllocatorSet(); nodesDestroyAllocatorSet();
// cleanupAppInfo(); cleanupAppInfo();
rpcCleanup(); rpcCleanup();
tscDebug("rpc cleanup"); tscDebug("rpc cleanup");

View File

@ -74,8 +74,9 @@ enum {
}; };
typedef struct { typedef struct {
tmr_h timer; tmr_h timer;
int32_t rsetId; int32_t rsetId;
TdThreadMutex lock;
} SMqMgmt; } SMqMgmt;
struct tmq_list_t { struct tmq_list_t {
@ -1603,13 +1604,21 @@ static void tmqMgmtInit(void) {
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ"); tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
if (tmqMgmt.timer == NULL) { if (tmqMgmt.timer == NULL) {
tmqInitRes = terrno; goto END;
} }
tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl); tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
if (tmqMgmt.rsetId < 0) { if (tmqMgmt.rsetId < 0) {
tmqInitRes = terrno; goto END;
} }
if (taosThreadMutexInit(&tmqMgmt.lock, NULL) != 0){
goto END;
}
return;
END:
tmqInitRes = terrno;
} }
void tmqMgmtClose(void) { void tmqMgmtClose(void) {
@ -1618,10 +1627,28 @@ void tmqMgmtClose(void) {
tmqMgmt.timer = NULL; tmqMgmt.timer = NULL;
} }
(void) taosThreadMutexLock(&tmqMgmt.lock);
if (tmqMgmt.rsetId >= 0) { if (tmqMgmt.rsetId >= 0) {
tmq_t *tmq = taosIterateRef(tmqMgmt.rsetId, 0);
int64_t refId = 0;
while (tmq) {
refId = tmq->refId;
if (refId == 0) {
break;
}
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
if (taosRemoveRef(tmqMgmt.rsetId, tmq->refId) != 0) {
qWarn("taosRemoveRef tmq refId:%" PRId64 " failed, error:%s", refId, tstrerror(terrno));
}
tmq = taosIterateRef(tmqMgmt.rsetId, refId);
}
taosCloseRef(tmqMgmt.rsetId); taosCloseRef(tmqMgmt.rsetId);
tmqMgmt.rsetId = -1; tmqMgmt.rsetId = -1;
} }
(void)taosThreadMutexUnlock(&tmqMgmt.lock);
} }
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
@ -2617,8 +2644,13 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
int32_t tmq_consumer_close(tmq_t* tmq) { int32_t tmq_consumer_close(tmq_t* tmq) {
if (tmq == NULL) return TSDB_CODE_INVALID_PARA; if (tmq == NULL) return TSDB_CODE_INVALID_PARA;
int32_t code = 0;
code = taosThreadMutexLock(&tmqMgmt.lock);
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__CLOSED){
goto end;
}
tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
int32_t code = tmq_unsubscribe(tmq); code = tmq_unsubscribe(tmq);
if (code == 0) { if (code == 0) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId); code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
@ -2626,6 +2658,9 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code); tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
} }
} }
end:
code = taosThreadMutexUnlock(&tmqMgmt.lock);
return code; return code;
} }