From 1b6fa8544cae3a7aebb33aadbd536436b9773091 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 10 Jul 2024 09:47:59 +0800 Subject: [PATCH] fix possible block of query worker pool --- source/util/src/tworker.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index bc5c46351a..4a8a0823b7 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -577,7 +577,7 @@ static int64_t atomicCompareExchangeRunning(int64_t* ptr, int32_t* expectedVal, } } -static int64_t atomicComapreAndExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive, +static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive, int32_t *expectedRunning, int32_t newRunning) { int64_t oldVal64 = *expectedActive, newVal64 = newActive; oldVal64 <<= 32; @@ -683,7 +683,7 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { int64_t val64 = pPool->activeRunningN; int32_t active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64); while (active > minActive) { - if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1)) + if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1)) return true; } atomicFetchSubRunning(&pPool->activeRunningN, 1); @@ -691,13 +691,18 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { } static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { - int32_t running = GET_RUNNING_N(pPool->activeRunningN); - while (running < pPool->num) { - if (atomicCompareExchangeRunning(&pPool->activeRunningN, &running, running + 1)) { - return TSDB_CODE_SUCCESS; + while (1) { + int64_t val64 = pPool->activeRunningN; + int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); + while (running < pPool->num) { + if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) { + return TSDB_CODE_SUCCESS; + } + } + if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) { + break; } } - atomicFetchSubActive(&pPool->activeRunningN, 1); // to wait for process taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); @@ -976,7 +981,7 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) { int64_t val64 = pPool->activeRunningN; int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); while (running < pPool->num) { - if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) { + if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) { return TSDB_CODE_SUCCESS; } }