From 5040ff5bd9594dbce910b846bac536817d650df5 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 28 Jun 2024 15:11:44 +0800 Subject: [PATCH] mnode/qnode support QueryAutoQWorkerPool --- include/dnode/mnode/mnode.h | 2 +- include/dnode/qnode/qnode.h | 2 +- include/util/tqueue.h | 2 +- include/util/tworker.h | 25 ++++--- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 3 +- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 3 +- source/dnode/mnode/impl/inc/mndInt.h | 3 + source/dnode/mnode/impl/src/mndMain.c | 27 ++++++-- source/dnode/mnode/impl/src/mndQuery.c | 20 +++--- source/dnode/qnode/inc/qndInt.h | 2 +- source/dnode/qnode/src/qnode.c | 7 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/audit/src/auditMain.c | 1 - source/libs/executor/src/exchangeoperator.c | 15 ++-- source/libs/executor/src/executor.c | 1 + source/util/src/tworker.c | 77 ++++++++++++++++----- 16 files changed, 137 insertions(+), 55 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index fe96fe1117..b821231539 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -109,7 +109,7 @@ int64_t mndGetRoleTimeMs(SMnode *pMnode); * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ -int32_t mndProcessRpcMsg(SRpcMsg *pMsg); +int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg); void mndPostProcessQueryMsg(SRpcMsg *pMsg); diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 7d342c4ba1..e7f9d00ff3 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -60,7 +60,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); * @param pQnode The qnode object. * @param pMsg The request message */ -int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg); +int32_t qndProcessQueryMsg(SQnode *pQnode, SQueueInfo* pInfo, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 51a3b2dc7d..bed218ac1b 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -49,7 +49,7 @@ typedef struct { int32_t workerId; int32_t threadNum; int64_t timestamp; - void *poolCb; + void *workerCb; } SQueueInfo; typedef enum { diff --git a/include/util/tworker.h b/include/util/tworker.h index 6df335fc74..5e5271d324 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -84,18 +84,25 @@ void tWWorkerCleanup(SWWorkerPool *pool); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); +typedef enum SQWorkerPoolType { + QWORKER_POOL = 0, + QUERY_AUTO_QWORKER_POOL, +} SQWorkerPoolType; + typedef struct { - const char *name; - int32_t min; - int32_t max; - FItem fp; - void *param; + const char *name; + int32_t min; + int32_t max; + FItem fp; + void *param; + SQWorkerPoolType poolType; } SSingleWorkerCfg; typedef struct { - const char *name; - STaosQueue *queue; - SQWorkerPool pool; + const char *name; + STaosQueue *queue; + SQWorkerPoolType poolType; // default to QWORKER_POOL + void *pool; } SSingleWorker; typedef struct { @@ -134,6 +141,8 @@ typedef struct SQueryAutoQWorkerPool { int32_t activeN; // running workers and workers waiting at reading new queue msg int32_t runningN; // workers processing queue msgs, not include blocking/waitingA/waitingB workers. + TdThreadMutex activeLock; + int32_t waitingAfterBlockN; // workers that recovered from blocking but waiting for too many running workers TdThreadMutex waitingAfterBlockLock; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 885086e37a..e5c32f9a43 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -53,7 +53,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); - int32_t code = mndProcessRpcMsg(pMsg); + int32_t code = mndProcessRpcMsg(pMsg, pInfo); if (pInfo->timestamp != 0) { int64_t cost = taosGetTimestampUs() - pInfo->timestamp; @@ -203,6 +203,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { .name = "mnode-query", .fp = (FItem)mmProcessRpcMsg, .param = pMgmt, + .poolType = QUERY_AUTO_QWORKER_POOL, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { dError("failed to start mnode-query worker since %s", terrstr()); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 28da0f9c5f..5c635ff5ea 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -30,7 +30,7 @@ static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; dTrace("msg:%p, get from qnode queue", pMsg); - int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pInfo->timestamp, pMsg); + int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pInfo, pMsg); if (IsReq(pMsg) && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; qmSendRsp(pMsg, code); @@ -105,6 +105,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { .name = "qnode-query", .fp = (FItem)qmProcessQueue, .param = pMgmt, + .poolType = QUERY_AUTO_QWORKER_POOL, }; if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 2da14c65d2..072568eb0f 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -54,6 +54,7 @@ extern "C" { #define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) typedef int32_t (*MndMsgFp)(SRpcMsg *pMsg); +typedef int32_t (*MndMsgFpExt)(SRpcMsg *pMsg, SQueueInfo* pInfo); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); typedef int32_t (*ShowRetrieveFp)(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -137,11 +138,13 @@ typedef struct SMnode { SEncryptMgmt encryptMgmt; SGrantInfo grant; MndMsgFp msgFp[TDMT_MAX]; + MndMsgFpExt msgFpExt[TDMT_MAX]; SMsgCb msgCb; int64_t ipWhiteVer; } SMnode; void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); +void mndSetMsgHandleExt(SMnode *pMnode, tmsg_t msgType, MndMsgFpExt fp); int64_t mndGenerateUid(const char *name, int32_t len); void mndSetRestored(SMnode *pMnode, bool restored); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index cad8c6d745..398ea5d589 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -844,21 +844,29 @@ _OVER: return -1; } -int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { +int32_t mndProcessRpcMsg(SRpcMsg *pMsg, SQueueInfo* pQueueInfo) { SMnode *pMnode = pMsg->info.node; const STraceId *trace = &pMsg->info.traceId; + int32_t code = TSDB_CODE_SUCCESS; - MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; + MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; + MndMsgFpExt fpExt = NULL; if (fp == NULL) { - mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - return -1; + fpExt = pMnode->msgFpExt[TMSG_INDEX(pMsg->msgType)]; + if (fpExt == NULL) { + mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + return -1; + } } if (mndCheckMnodeState(pMsg) != 0) return -1; mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); - int32_t code = (*fp)(pMsg); + if (fp) + code = (*fp)(pMsg); + else + code = (*fpExt)(pMsg, pQueueInfo); mndReleaseRpc(pMnode); if (code == TSDB_CODE_ACTION_IN_PROGRESS) { @@ -883,6 +891,13 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { } } +void mndSetMsgHandleExt(SMnode *pMnode, tmsg_t msgType, MndMsgFpExt fp) { + tmsg_t type = TMSG_INDEX(msgType); + if (type < TDMT_MAX) { + pMnode->msgFpExt[type] = fp; + } +} + // Note: uid 0 is reserved int64_t mndGenerateUid(const char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index c03b02c17f..ae930f0d96 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -30,11 +30,11 @@ void mndPostProcessQueryMsg(SRpcMsg *pMsg) { qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); } -int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { +int32_t mndProcessQueryMsg(SRpcMsg *pMsg, SQueueInfo* pInfo) { int32_t code = -1; SMnode *pMnode = pMsg->info.node; - SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; + SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb, .pWorkerCb = pInfo->workerCb}; mTrace("msg:%p, in query queue is processing", pMsg); switch (pMsg->msgType) { @@ -173,14 +173,14 @@ int32_t mndInitQuery(SMnode *pMnode) { return -1; } - mndSetMsgHandle(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_TASK_NOTIFY, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_QUERY, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_TASK_NOTIFY, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg); + mndSetMsgHandleExt(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_MND_BATCH_META, mndProcessBatchMetaMsg); return 0; diff --git a/source/dnode/qnode/inc/qndInt.h b/source/dnode/qnode/inc/qndInt.h index 86deda52ad..e8ccb75040 100644 --- a/source/dnode/qnode/inc/qndInt.h +++ b/source/dnode/qnode/inc/qndInt.h @@ -29,7 +29,7 @@ extern "C" { #endif -typedef struct SQueueWorker SQHandle; +typedef struct SQWorker SQHandle; typedef struct SQnode { int32_t qndId; diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 9937debb13..8cd967a8a8 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tqueue.h" #include "executor.h" #include "qndInt.h" #include "query.h" @@ -24,6 +25,7 @@ SQnode *qndOpen(const SQnodeOpt *pOption) { qError("calloc SQnode failed"); return NULL; } + pQnode->qndId = QNODE_HANDLE; if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, (void **)&pQnode->pQuery, &pOption->msgCb)) { taosMemoryFreeClear(pQnode); @@ -72,9 +74,10 @@ int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg, false); } -int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { +int32_t qndProcessQueryMsg(SQnode *pQnode, SQueueInfo* pInfo, SRpcMsg *pMsg) { int32_t code = -1; - SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; + int64_t ts = pInfo->timestamp; + SReadHandle handle = {.pMsgCb = &pQnode->msgCb, .pWorkerCb = pInfo->workerCb}; qTrace("message in qnode queue is processing"); switch (pMsg->msgType) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ba84951b7b..2d05cc2e00 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -747,7 +747,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo* pInfo) { return 0; } - SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->poolCb}; + SReadHandle handle = {.vnode = pVnode, .pMsgCb = &pVnode->msgCb, .pWorkerCb = pInfo->workerCb}; initStorageAPI(&handle.api); switch (pMsg->msgType) { diff --git a/source/libs/audit/src/auditMain.c b/source/libs/audit/src/auditMain.c index 96934888eb..aa3b669c1b 100644 --- a/source/libs/audit/src/auditMain.c +++ b/source/libs/audit/src/auditMain.c @@ -22,7 +22,6 @@ #include "ttime.h" #include "tjson.h" #include "tglobal.h" -#include "mnode.h" #include "audit.h" #include "osMemory.h" diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 1888d3dabe..059e1f2663 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -976,14 +976,21 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa static int32_t exchangeWait(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo) { SExecTaskInfo* pTask = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; if (pTask->pWorkerCb) { - pTask->code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool); - if (pTask->code != TSDB_CODE_SUCCESS) return pTask->code; + code = pTask->pWorkerCb->beforeBlocking(pTask->pWorkerCb->pPool); + if (code != TSDB_CODE_SUCCESS) { + pTask->code = code; + return pTask->code; + } } tsem_wait(&pExchangeInfo->ready); if (pTask->pWorkerCb) { - pTask->code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); - if (pTask->code != TSDB_CODE_SUCCESS) return pTask->code; + code = pTask->pWorkerCb->afterRecoverFromBlocking(pTask->pWorkerCb->pPool); + if (code != TSDB_CODE_SUCCESS) { + pTask->code = code; + return pTask->code; + } } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 77a80d229e..ee4b0bbae9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -794,6 +794,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i); SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId); if (pExchangeInfo) { + qDebug("%s stop exchange operator", GET_TASKID(pTaskInfo)); tsem_post(&pExchangeInfo->ready); taosReleaseRef(exchangeObjRefPool, pStop->refId); } diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 2ca0450bed..3bd4039a9c 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -433,28 +433,66 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) { } int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) { - SQWorkerPool *pPool = &pWorker->pool; - pPool->name = pCfg->name; - pPool->min = pCfg->min; - pPool->max = pCfg->max; - if (tQWorkerInit(pPool) != 0) return -1; - - pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); - if (pWorker->queue == NULL) return -1; - + pWorker->poolType = pCfg->poolType; pWorker->name = pCfg->name; + + switch (pCfg->poolType) { + case QWORKER_POOL: { + SQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQWorkerPool)); + if (!pPool) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pPool->name = pCfg->name; + pPool->min = pCfg->min; + pPool->max = pCfg->max; + pWorker->pool = pPool; + if (tQWorkerInit(pPool) != 0) return -1; + + pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (pWorker->queue == NULL) return -1; + } break; + case QUERY_AUTO_QWORKER_POOL: { + SQueryAutoQWorkerPool *pPool = taosMemoryCalloc(1, sizeof(SQueryAutoQWorkerPool)); + if (!pPool) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pPool->name = pCfg->name; + pPool->min = pCfg->min; + pPool->max = pCfg->max; + pWorker->pool = pPool; + if (tQueryAutoQWorkerInit(pPool) != 0) return -1; + + pWorker->queue = tQueryAutoQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp); + if (!pWorker->queue) return -1; + } break; + default: + assert(0); + } return 0; } void tSingleWorkerCleanup(SSingleWorker *pWorker) { if (pWorker->queue == NULL) return; - while (!taosQueueEmpty(pWorker->queue)) { taosMsleep(10); } - tQWorkerCleanup(&pWorker->pool); - tQWorkerFreeQueue(&pWorker->pool, pWorker->queue); + switch (pWorker->poolType) { + case QWORKER_POOL: + tQWorkerCleanup(pWorker->pool); + tQWorkerFreeQueue(pWorker->pool, pWorker->queue); + taosMemoryFree(pWorker->pool); + break; + case QUERY_AUTO_QWORKER_POOL: + tQueryAutoQWorkerCleanup(pWorker->pool); + tQueryAutoQWorkerFreeQueue(pWorker->pool, pWorker->queue); + taosMemoryFree(pWorker->pool); + break; + default: + assert(0); + } } int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) { @@ -517,7 +555,7 @@ static void *tQueryAutoQWorkerThreadFp(SQueryAutoQWorker *worker) { if (qinfo.fp != NULL) { qinfo.workerId = worker->id; qinfo.threadNum = pool->num; - qinfo.poolCb = pool->pCb; + qinfo.workerCb = pool->pCb; (*((FItem)qinfo.fp))(&qinfo, msg); } @@ -573,33 +611,38 @@ static bool tQueryAutoQWorkerTrySignalWaitingBeforeProcess(void* p) { static bool tQueryAutoQWorkerTryDecActive(void* p, int32_t minActive) { SQueryAutoQWorkerPool *pPool = p; bool ret = false; + taosThreadMutexLock(&pPool->activeLock); int32_t active = pPool->activeN; while (active > minActive) { int32_t activeNew = atomic_val_compare_exchange_32(&pPool->activeN, active, active - 1); if (activeNew == active) { - atomic_fetch_sub_32(&pPool->runningN, 1); ret = true; break; } active = activeNew; } + atomic_fetch_sub_32(&pPool->runningN, 1); + taosThreadMutexUnlock(&pPool->activeLock); return ret; } static int32_t tQueryAutoQWorkerWaitingCheck(SQueryAutoQWorkerPool* pPool) { + taosThreadMutexLock(&pPool->activeLock); int32_t running = pPool->runningN; while (running < pPool->num) { int32_t runningNew = atomic_val_compare_exchange_32(&pPool->runningN, running, running + 1); if (runningNew == running) { // to running + taosThreadMutexUnlock(&pPool->activeLock); return TSDB_CODE_SUCCESS; } running = runningNew; } + atomic_fetch_sub_32(&pPool->activeN, 1); + taosThreadMutexUnlock(&pPool->activeLock); // to wait for process taosThreadMutexLock(&pPool->waitingBeforeProcessMsgLock); atomic_fetch_add_32(&pPool->waitingBeforeProcessMsgN, 1); - atomic_fetch_sub_32(&pPool->activeN, 1); if (!pPool->exit) taosThreadCondWait(&pPool->waitingBeforeProcessMsgCond, &pPool->waitingBeforeProcessMsgLock); // recovered from waiting taosThreadMutexUnlock(&pPool->waitingBeforeProcessMsgLock); @@ -650,7 +693,6 @@ bool tQueryAutoQWorkerTryRecycleWorker(SQueryAutoQWorkerPool* pPool, SQueryAutoQ return true; } else { - atomic_fetch_sub_32(&pPool->runningN, 1); return true; } } @@ -670,6 +712,7 @@ int32_t tQueryAutoQWorkerInit(SQueryAutoQWorkerPool *pool) { (void)taosThreadMutexInit(&pool->backupLock, NULL); (void)taosThreadMutexInit(&pool->waitingAfterBlockLock, NULL); (void)taosThreadMutexInit(&pool->waitingBeforeProcessMsgLock, NULL); + (void)taosThreadMutexInit(&pool->activeLock, NULL); (void)taosThreadCondInit(&pool->waitingBeforeProcessMsgCond, NULL); (void)taosThreadCondInit(&pool->waitingAfterBlockCond, NULL); @@ -757,6 +800,7 @@ void tQueryAutoQWorkerCleanup(SQueryAutoQWorkerPool *pPool) { taosThreadMutexDestroy(&pPool->backupLock); taosThreadMutexDestroy(&pPool->waitingAfterBlockLock); taosThreadMutexDestroy(&pPool->waitingBeforeProcessMsgLock); + taosThreadMutexDestroy(&pPool->activeLock); taosThreadCondDestroy(&pPool->backupCond); taosThreadCondDestroy(&pPool->waitingAfterBlockCond); @@ -866,7 +910,6 @@ static int32_t tQueryAutoQWorkerBeforeBlocking(void *p) { if (code != TSDB_CODE_SUCCESS) { return code; } - atomic_fetch_sub_32(&pPool->runningN, 1); } return TSDB_CODE_SUCCESS;