Merge pull request #26497 from taosdata/fix/3.0/TD-30936

fix possible block of query worker pool
This commit is contained in:
dapan1121 2024-07-10 18:26:00 +08:00 committed by GitHub
commit 4bad97a261
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 13 additions and 8 deletions

View File

@ -577,7 +577,7 @@ static int64_t atomicCompareExchangeRunning(int64_t* ptr, int32_t* expectedVal,
}
}
static int64_t atomicComapreAndExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
static int64_t atomicCompareExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
int32_t *expectedRunning, int32_t newRunning) {
int64_t oldVal64 = *expectedActive, newVal64 = newActive;
oldVal64 <<= 32;
@ -683,7 +683,7 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) {
int64_t val64 = pPool->activeRunningN;
int32_t active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64);
while (active > minActive) {
if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
return true;
}
atomicFetchSubRunning(&pPool->activeRunningN, 1);
@ -691,13 +691,18 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) {
}
static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) {
int32_t running = GET_RUNNING_N(pPool->activeRunningN);
while (running < pPool->num) {
if (atomicCompareExchangeRunning(&pPool->activeRunningN, &running, running + 1)) {
return TSDB_CODE_SUCCESS;
while (1) {
int64_t val64 = pPool->activeRunningN;
int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
while (running < pPool->num) {
if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active, &running, running + 1)) {
return TSDB_CODE_SUCCESS;
}
}
if (atomicCompareExchangeActive(&pPool->activeRunningN, &active, active - 1)) {
break;
}
}
atomicFetchSubActive(&pPool->activeRunningN, 1);
// to wait for process
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
@ -976,7 +981,7 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
int64_t val64 = pPool->activeRunningN;
int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
while (running < pPool->num) {
if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
if (atomicCompareExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
return TSDB_CODE_SUCCESS;
}
}