avoid locks in queryAutoQWorkerPool

This commit is contained in:
wangjiaming0909 2024-07-01 15:41:19 +08:00
parent c3deebf5b6
commit d0bb671d79
2 changed files with 89 additions and 36 deletions

View File

@ -139,10 +139,9 @@ typedef struct SQueryAutoQWorkerPool {
int32_t min;
int32_t maxInUse;
int32_t activeN; // running workers and workers waiting at reading new queue msg
int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers.
TdThreadMutex activeLock;
int64_t activeRunningN; // 4 bytes for activeN, 4 bytes for runningN
// activeN are running workers and workers waiting at reading new queue msgs
// runningN are workers processing queue msgs, not include blocking/waitingAfterBlock/waitingBeforeProcessMsg workers.
int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers
TdThreadMutex waitingAfterBlockLock;

View File

@ -525,6 +525,75 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p);
static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool);
static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker);
#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32)
#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF)
static int32_t atomicFetchSubActive(int64_t *ptr, int32_t val) {
int64_t acutalSubVal = val;
acutalSubVal <<= 32;
int64_t newVal64 = atomic_fetch_sub_64(ptr, acutalSubVal);
return GET_ACTIVE_N(newVal64);
}
static int32_t atomicFetchSubRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_sub_64(ptr, val)); }
static int32_t atomicFetchAddActive(int64_t *ptr, int32_t val) {
int64_t actualAddVal = val;
actualAddVal <<= 32;
int64_t newVal64 = atomic_fetch_add_64(ptr, actualAddVal);
return GET_ACTIVE_N(newVal64);
}
static int32_t atomicFetchAddRunning(int64_t *ptr, int32_t val) { return GET_RUNNING_N(atomic_fetch_add_64(ptr, val)); }
static bool atomicCompareExchangeActive(int64_t* ptr, int32_t* expectedVal, int32_t newVal) {
int64_t oldVal64 = *expectedVal, newVal64 = newVal;
int32_t running = GET_RUNNING_N(*ptr);
oldVal64 <<= 32;
newVal64 <<= 32;
oldVal64 |= running;
newVal64 |= running;
int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
if (actualNewVal64 == oldVal64) {
return true;
} else {
*expectedVal = GET_ACTIVE_N(actualNewVal64);
return false;
}
}
static int64_t atomicCompareExchangeRunning(int64_t* ptr, int32_t* expectedVal, int32_t newVal) {
int64_t oldVal64 = *expectedVal, newVal64 = newVal;
int64_t activeShifted = GET_ACTIVE_N(*ptr);
activeShifted <<= 32;
oldVal64 |= activeShifted;
newVal64 |= activeShifted;
int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
if (actualNewVal64 == oldVal64) {
return true;
} else {
*expectedVal = GET_RUNNING_N(actualNewVal64);
return false;
}
}
static int64_t atomicComapreAndExchangeActiveAndRunning(int64_t *ptr, int32_t *expectedActive, int32_t newActive,
int32_t *expectedRunning, int32_t newRunning) {
int64_t oldVal64 = *expectedActive, newVal64 = newActive;
oldVal64 <<= 32;
oldVal64 |= *expectedRunning;
newVal64 <<= 32;
newVal64 |= newRunning;
int64_t actualNewVal64 = atomic_val_compare_exchange_64(ptr, oldVal64, newVal64);
if (actualNewVal64 == oldVal64) {
return true;
} else {
*expectedActive = GET_ACTIVE_N(actualNewVal64);
*expectedRunning = GET_RUNNING_N(actualNewVal64);
return false;
}
}
static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
SQueryAutoQWorkerPool *pool = worker->pool;
SQueueInfo qinfo = {0};
@ -575,71 +644,60 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) {
static bool tQueryAutoQWorkerTrySignalWaitingAfterBlock(void *p) {
SQueryAutoQWorkerPool *pPool = p;
bool ret = false;
taosThreadMutexLock(&pPool->waitingAfterBlockLock);
int32_t waiting = pPool->waitingAfterBlockN;
while (waiting > 0) {
int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingAfterBlockN, waiting, waiting - 1);
if (waitingNew == waiting) {
taosThreadMutexLock(&pPool->waitingAfterBlockLock);
taosThreadCondSignal(&pPool->waitingAfterBlockCond);
taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
ret = true;
break;
}
waiting = waitingNew;
}
taosThreadMutexUnlock(&pPool->waitingAfterBlockLock);
return ret;
}
static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) {
SQueryAutoQWorkerPool *pPool = p;
bool ret = false;
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
int32_t waiting = pPool->waitingBeforeProcessMsgN;
while (waiting > 0) {
int32_t waitingNew = atomic_val_compare_exchange_32(&pPool->waitingBeforeProcessMsgN, waiting, waiting - 1);
if (waitingNew == waiting) {
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
taosThreadCondSignal(&pPool->waitingBeforeProcessMsgCond);
taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
ret = true;
break;
}
waiting = waitingNew;
}
taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock);
return ret;
}
static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) {
SQueryAutoQWorkerPool *pPool = p;
bool ret = false;
taosThreadMutexLock(&pPool->activeLock);
int32_t active = pPool->activeN;
int64_t val64 = pPool->activeRunningN;
int32_t active = GET_ACTIVE_N(val64), running = GET_RUNNING_N(val64);
while (active > minActive) {
int32_t activeNew = atomic_val_compare_exchange_32(&pPool->activeN, active, active - 1);
if (activeNew == active) {
ret = true;
break;
}
active = activeNew;
if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active - 1, &running, running - 1))
return true;
}
atomic_fetch_sub_32(&pPool->runningN, 1);
taosThreadMutexUnlock(&pPool->activeLock);
return ret;
atomicFetchSubRunning(&pPool->activeRunningN, 1);
return false;
}
static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) {
taosThreadMutexLock(&pPool->activeLock);
int32_t running = pPool->runningN;
int32_t running = GET_RUNNING_N(pPool->activeRunningN);
while (running < pPool->num) {
int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1);
if (runningNew == running) {
// to running
taosThreadMutexUnlock(&pPool->activeLock);
if (atomicCompareExchangeRunning(&pPool->activeRunningN, &running, running + 1)) {
return TSDB_CODE_SUCCESS;
}
running = runningNew;
}
atomic_fetch_sub_32(&pPool->activeN, 1);
taosThreadMutexUnlock(&pPool->activeLock);
atomicFetchSubActive(&pPool->activeRunningN, 1);
// to wait for process
taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock);
atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1);
@ -712,7 +770,6 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) {
(void)taosThreadMutexInit(&pool->backupLock, NULL);
(void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL);
(void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL);
(void)taosThreadMutexInit(&pool->activeLock, NULL);
(void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL);
(void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL);
@ -800,7 +857,6 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) {
taosThreadMutexDestroy(&pPool->backupLock);
taosThreadMutexDestroy(&pPool->waitingAfterBlockLock);
taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock);
taosThreadMutexDestroy(&pPool->activeLock);
taosThreadCondDestroy(&pPool->backupCond);
taosThreadCondDestroy(&pPool->waitingAfterBlockCond);
@ -846,7 +902,7 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pool, void *ahand
taosThreadAttrDestroy(&thAttr);
pool->num++;
pool->activeN++;
atomicFetchAddActive(&pool->activeRunningN, 1);
uInfo("worker:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num);
} while (pool->num < pool->min);
}
@ -917,14 +973,12 @@ static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) {
static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) {
SQueryAutoQWorkerPool* pPool = p;
int32_t running = pPool->runningN;
int64_t val64 = pPool->activeRunningN;
int32_t running = GET_RUNNING_N(val64), active = GET_ACTIVE_N(val64);
while (running < pPool->num) {
int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1);
if (runningNew == running) {
atomic_fetch_add_32(&pPool->activeN, 1);
if (atomicComapreAndExchangeActiveAndRunning(&pPool->activeRunningN, &active, active + 1, &running, running + 1)) {
return TSDB_CODE_SUCCESS;
}
running = runningNew;
}
taosThreadMutexLock(&pPool->waitingAfterBlockLock);
atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1);