fix possible block of query worker pool

This commit is contained in:
wangjiaming0909 2024-07-10 09:47:59 +08:00
parent 92089e2d86
commit 1b6fa8544c
1 changed files with 13 additions and 8 deletions

View File

@ -577,7 +577,7 @@ static int64_t atomicCompareExchangeRunning(int64_t* ptr, int32_t* expectedVal,
} }
} }
static int64_t atomicComapreAndExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive, static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
int32_t *expectedRunning, int32_t newRunning) { int32_t *expectedRunning, int32_t newRunning) {
int64_t oldVal64 = *expectedActive, newVal64 = newActive; int64_t oldVal64 = *expectedActive, newVal64 = newActive;
oldVal64 <<= 32; oldVal64 <<= 32;
@ -683,7 +683,7 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) {
int64_t val64 = pPool->activeRunningN; int64_t val64 = pPool->activeRunningN;
int32_t active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64); int32_t active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64);
while (active > minActive) { while (active > minActive) {
if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1)) if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
return true; return true;
} }
atomicFetchSubRunning(&pPool->activeRunningN, 1); atomicFetchSubRunning(&pPool->activeRunningN, 1);
@ -691,13 +691,18 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) {
} }
static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) {
int32_t running = GET_RUNNING_N(pPool->activeRunningN); while (1) {
while (running < pPool->num) { int64_t val64 = pPool->activeRunningN;
if (atomicCompareExchangeRunning(&pPool->activeRunningN, &running, running + 1)) { int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
return TSDB_CODE_SUCCESS; while (running < pPool->num) {
if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
return TSDB_CODE_SUCCESS;
}
}
if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
break;
} }
} }
atomicFetchSubActive(&pPool->activeRunningN, 1);
// to wait for process // to wait for process
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
@ -976,7 +981,7 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
int64_t val64 = pPool->activeRunningN; int64_t val64 = pPool->activeRunningN;
int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
while (running < pPool->num) { while (running < pPool->num) {
if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) { if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }