From d0bb671d7947c98836cbc294eaf0dd521eff54a5 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 1 Jul 2024 15:41:19 +0800 Subject: [PATCH] avoid locks in queryAutoQWorkerPool --- include/util/tworker.h | 7 +-- source/util/src/tworker.c | 118 +++++++++++++++++++++++++++----------- 2 files changed, 89 insertions(+), 36 deletions(-) diff --git a/include/util/tworker.h b/include/util/tworker.h index 5e5271d324..a3ba7dba6d 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -139,10 +139,9 @@ typedef struct SQueryAutoQWorkerPool { int32_t min; int32_t maxInUse; - int32_t activeN; // running workers and workers waiting at reading new queue msg - int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers. - TdThreadMutex activeLock; - + int64_t activeRunningN; // 4 bytes for activeN, 4 bytes for runningN + // activeN are running workers and workers waiting at reading new queue msgs + // runningN are workers processing queue msgs, not include blocking/waitingAfterBlock/waitingBeforeProcessMsg workers. int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers TdThreadMutex waitingAfterBlockLock; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 3bd4039a9c..bc5c46351a 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -525,6 +525,75 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool); static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker); +#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32) +#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF) + +static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) { + int64_t acutalSubVal = val; + acutalSubVal <<= 32; + int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal); + return GET_ACTIVE_N(newVal64); +} + +static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); } + +static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) { + int64_t actualAddVal = val; + actualAddVal <<= 32; + int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal); + return GET_ACTIVE_N(newVal64); +} + +static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); } + +static bool atomicCompareExchangeActive(int64_t* ptr, int32_t* expectedVal, int32_t newVal) { + int64_t oldVal64 = *expectedVal, newVal64 = newVal; + int32_t running = GET_RUNNING_N(*ptr); + oldVal64 <<= 32; + newVal64 <<= 32; + oldVal64 |= running; + newVal64 |= running; + int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64); + if (actualNewVal64 == oldVal64) { + return true; + } else { + *expectedVal = GET_ACTIVE_N(actualNewVal64); + return false; + } +} + +static int64_t atomicCompareExchangeRunning(int64_t* ptr, int32_t* expectedVal, int32_t newVal) { + int64_t oldVal64 = *expectedVal, newVal64 = newVal; + int64_t activeShifted = GET_ACTIVE_N(*ptr); + activeShifted <<= 32; + oldVal64 |= activeShifted; + newVal64 |= activeShifted; + int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64); + if (actualNewVal64 == oldVal64) { + return true; + } else { + *expectedVal = GET_RUNNING_N(actualNewVal64); + return false; + } +} + +static int64_t atomicComapreAndExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive, + int32_t *expectedRunning, int32_t newRunning) { + int64_t oldVal64 = *expectedActive, newVal64 = newActive; + oldVal64 <<= 32; + oldVal64 |= *expectedRunning; + newVal64 <<= 32; + newVal64 |= newRunning; + int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64); + if (actualNewVal64 == oldVal64) { + return true; + } else { + *expectedActive = GET_ACTIVE_N(actualNewVal64); + *expectedRunning = GET_RUNNING_N(actualNewVal64); + return false; + } +} + static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { SQueryAutoQWorkerPool *pool = worker->pool; SQueueInfo qinfo = {0}; @@ -575,71 +644,60 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; - taosThreadMutexLock(&pPool->waitingAfterBlockLock); int32_t waiting = pPool->waitingAfterBlockN; while (waiting > 0) { int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1); if (waitingNew == waiting) { + taosThreadMutexLock(&pPool->waitingAfterBlockLock); taosThreadCondSignal(&pPool->waitingAfterBlockCond); + taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); ret = true; break; } waiting = waitingNew; } - taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); return ret; } static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; - taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); int32_t waiting = pPool->waitingBeforeProcessMsgN; while (waiting > 0) { int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1); if (waitingNew == waiting) { + taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond); + taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); ret = true; break; } waiting = waitingNew; } - taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); return ret; } static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; - taosThreadMutexLock(&pPool->activeLock); - int32_t active = pPool->activeN; + int64_t val64 = pPool->activeRunningN; + int32_t active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64); while (active > minActive) { - int32_t activeNew = atomic_val_compare_exchange_32(&pPool->activeN, active, active - 1); - if (activeNew == active) { - ret = true; - break; - } - active = activeNew; + if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1)) + return true; } - atomic_fetch_sub_32(&pPool->runningN, 1); - taosThreadMutexUnlock(&pPool->activeLock); - return ret; + atomicFetchSubRunning(&pPool->activeRunningN, 1); + return false; } static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { - taosThreadMutexLock(&pPool->activeLock); - int32_t running = pPool->runningN; + int32_t running = GET_RUNNING_N(pPool->activeRunningN); while (running < pPool->num) { - int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); - if (runningNew == running) { - // to running - taosThreadMutexUnlock(&pPool->activeLock); + if (atomicCompareExchangeRunning(&pPool->activeRunningN, &running, running + 1)) { return TSDB_CODE_SUCCESS; } - running = runningNew; } - atomic_fetch_sub_32(&pPool->activeN, 1); - taosThreadMutexUnlock(&pPool->activeLock); + atomicFetchSubActive(&pPool->activeRunningN, 1); // to wait for process taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); @@ -712,7 +770,6 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { (void)taosThreadMutexInit(&pool->backupLock, NULL); (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL); - (void)taosThreadMutexInit(&pool->activeLock, NULL); (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL); (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL); @@ -800,7 +857,6 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { taosThreadMutexDestroy(&pPool->backupLock); taosThreadMutexDestroy(&pPool->waitingAfterBlockLock); taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock); - taosThreadMutexDestroy(&pPool->activeLock); taosThreadCondDestroy(&pPool->backupCond); taosThreadCondDestroy(&pPool->waitingAfterBlockCond); @@ -846,7 +902,7 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand taosThreadAttrDestroy(&thAttr); pool->num++; - pool->activeN++; + atomicFetchAddActive(&pool->activeRunningN, 1); uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num); } while (pool->num < pool->min); } @@ -917,14 +973,12 @@ static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) { static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) { SQueryAutoQWorkerPool* pPool = p; - int32_t running = pPool->runningN; + int64_t val64 = pPool->activeRunningN; + int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64); while (running < pPool->num) { - int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); - if (runningNew == running) { - atomic_fetch_add_32(&pPool->activeN, 1); + if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) { return TSDB_CODE_SUCCESS; } - running = runningNew; } taosThreadMutexLock(&pPool->waitingAfterBlockLock); atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);