diff --git a/include/util/tworker.h b/include/util/tworker.h index 3af6d3d8ba..6df335fc74 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -130,13 +130,11 @@ typedef struct SQueryAutoQWorkerPool { int32_t num; int32_t max; int32_t min; - int32_t maxRunning; + int32_t maxInUse; int32_t activeN; // running workers and workers waiting at reading new queue msg int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers. - int32_t blockingN; // blocked worker num, like exchangeoperator sem_wait - int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers TdThreadMutex waitingAfterBlockLock; TdThreadCond waitingAfterBlockCond; @@ -165,9 +163,9 @@ STaosQueue *tQueryAutoQWorkerAllocQueue(SQueryAutoQWorkerPool *pPool, void *ahan void tQueryAutoQWorkerFreeQueue(SQueryAutoQWorkerPool* pPool, STaosQueue* pQ); typedef struct SQueryAutoQWorkerPoolCB { - SQueryAutoQWorkerPool* pPool; - int32_t (*beforeBlocking)(void* pPool); - int32_t (*afterRecoverFromBlocking)(void* pPool); + void *pPool; + int32_t (*beforeBlocking)(void *pPool); + int32_t (*afterRecoverFromBlocking)(void *pPool); } SQueryAutoQWorkerPoolCB; #ifdef __cplusplus diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index a9b568dd91..d30b100147 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -254,7 +254,6 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { req->queryId = qId; req->taskId = tId; req->execId = eId; - //taosMsleep(500); SRpcMsg pNewMsg = { .msgType = TDMT_SCH_QUERY_CONTINUE, diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 482db7b5b8..2ca0450bed 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -482,12 +482,10 @@ void tMultiWorkerCleanup(SMultiWorker *pWorker) { } static int32_t tQueryAutoQWorkerAddWorker(SQueryAutoQWorkerPool* pool); -static void tQueryAutoQWorkerStartRunning(void *p); static int32_t tQueryAutoQWorkerBeforeBlocking(void *p); static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p); -static int32_t tQueryAutoQWorkerWaitingCheck(void *p); +static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool); static bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQWorker* pWorker); -static void tQueryAutoQWorkerFinish(void *p); static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { SQueryAutoQWorkerPool *pool = worker->pool; @@ -498,7 +496,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { taosBlockSIGPIPE(); setThreadName(pool->name); worker->pid = taosGetSelfPthreadId(); - uInfo("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); + uDebug("worker:%s:%d is running, thread:%08" PRId64, pool->name, worker->id, worker->pid); while (1) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { @@ -514,8 +512,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { } } - code = tQueryAutoQWorkerWaitingCheck(pool); - // TODO wjm what if taosd is exiting + tQueryAutoQWorkerWaitingCheck(pool); if (qinfo.fp != NULL) { qinfo.workerId = worker->id; @@ -526,6 +523,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { taosUpdateItemSize(qinfo.queue, 1); if (!tQueryAutoQWorkerTryRecycleWorker(pool, worker)) { + uDebug("worker:%s:%d exited", pool->name, worker->id); break; } } @@ -588,8 +586,7 @@ static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { return ret; } -static int32_t tQueryAutoQWorkerWaitingCheck(void *p) { - SQueryAutoQWorkerPool* pPool = p; +static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { int32_t running = pPool->runningN; while (running < pPool->num) { int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); @@ -606,7 +603,6 @@ static int32_t tQueryAutoQWorkerWaitingCheck(void *p) { if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); // recovered from waiting taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); - if (pPool->exit) return -1; // TODO wjm error code return TSDB_CODE_SUCCESS; } @@ -617,8 +613,8 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQ SListNode* pNode = listNode(pWorker); tdListPopNode(pPool->workers, pNode); // reclaim some workers - if (pWorker->id >= pPool->num * 2 ) { - while (listNEles(pPool->exitedWorkers) > pPool->num) { + if (pWorker->id >= pPool->maxInUse) { + while (listNEles(pPool->exitedWorkers) > pPool->maxInUse - pPool->num) { SListNode* head = tdListPopHead(pPool->exitedWorkers); SQueryAutoQWorker* pWorker = (SQueryAutoQWorker*)head->data; if (pWorker && taosCheckPthreadValid(pWorker->thread)) { @@ -668,6 +664,7 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { if (!pool->backupWorkers) return TSDB_CODE_OUT_OF_MEMORY; pool->exitedWorkers = tdListNew(sizeof(SQueryAutoQWorker)); if (!pool->exitedWorkers) return TSDB_CODE_OUT_OF_MEMORY; + pool->maxInUse = pool->max * 2 + 2; (void)taosThreadMutexInit(&pool->poolLock, NULL); (void)taosThreadMutexInit(&pool->backupLock, NULL); @@ -732,7 +729,6 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { } while (listNEles(pPool->backupWorkers) > 0) { - // TODO wjm need lock? SListNode* pNode = tdListPopHead(pPool->backupWorkers); worker = (SQueryAutoQWorker*)pNode->data; if (worker && taosCheckPthreadValid(worker->thread)) { @@ -891,6 +887,6 @@ static int32_t tQueryAutoQWorkerRecoverFromBlocking(void *p) { atomic_fetch_add_32(&pPool->waitingAfterBlockN, 1); if (!pPool->exit) taosThreadCondWait(&pPool->waitingAfterBlockCond, &pPool->waitingAfterBlockLock); taosThreadMutexUnlock(&pPool->waitingAfterBlockLock); - if (pPool->exit) return -1; // TODO wjm error code + if (pPool->exit) return TSDB_CODE_QRY_QWORKER_QUIT; return TSDB_CODE_SUCCESS; }