From 0383257900746673494bcfe931fdf45441df15e7 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 26 Jun 2024 17:03:40 +0800 Subject: [PATCH] limit the size of query queue --- source/libs/qcom/src/queryUtil.c | 51 +++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index ab536e9e62..e454ea3c01 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -24,7 +24,11 @@ #include "cJSON.h" #include "queryInt.h" -static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pMsg); +typedef struct SQueryQueue { + SQueryAutoQWorkerPool taskThreadPool; + STaosQueue* pTaskQueue; + tsem_t queueSem; +} SQueryQueue; int32_t getAsofJoinReverseOp(EOperatorType op) { switch (op) { @@ -121,48 +125,61 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag return true; } -static STaosQueue* pTaskQueue = NULL; -static SQueryAutoQWorkerPool taskThreadPool = {0}; +static SQueryQueue clientQueryQueue = {0}; + +static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { + __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; + execFn(pSchedMsg->thandle); + taosFreeQitem(pSchedMsg); + if (tsem_post(&clientQueryQueue.queueSem) != 0) { + uFatal("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + } +} int32_t initTaskQueue() { - taskThreadPool.name = "tsc"; - taskThreadPool.min = tsNumOfTaskQueueThreads; - taskThreadPool.max = tsNumOfTaskQueueThreads; - int32_t coce = tQueryAutoQWorkerInit(&taskThreadPool); + uint32_t queueSize = (uint32_t)(tsMaxShellConns * 2); + clientQueryQueue.taskThreadPool.name = "tsc"; + clientQueryQueue.taskThreadPool.min = tsNumOfTaskQueueThreads; + clientQueryQueue.taskThreadPool.max = tsNumOfTaskQueueThreads; + int32_t coce = tQueryAutoQWorkerInit(&clientQueryQueue.taskThreadPool); if (TSDB_CODE_SUCCESS != coce) { qError("failed to init task thread pool"); return -1; } - pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskThreadPool, NULL, (FItem)processTaskQueue); - if (NULL == pTaskQueue) { + clientQueryQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&clientQueryQueue.taskThreadPool, NULL, (FItem)processTaskQueue); + if (NULL == clientQueryQueue.pTaskQueue) { qError("failed to init task queue"); return -1; } + + if (tsem_init(&clientQueryQueue.queueSem, 0, queueSize) != 0) { + uError("init %s:queue semaphore failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + cleanupTaskQueue(); + return -1; + } qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads); return 0; } int32_t cleanupTaskQueue() { - tQueryAutoQWorkerCleanup(&taskThreadPool); + tsem_destroy(&clientQueryQueue.queueSem); + tQueryAutoQWorkerCleanup(&clientQueryQueue.taskThreadPool); return 0; } -static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { - __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; - execFn(pSchedMsg->thandle); - taosFreeQitem(pSchedMsg); -} - int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { SSchedMsg* pSchedMsg = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0); pSchedMsg->fp = NULL; pSchedMsg->ahandle = execFn; pSchedMsg->thandle = execParam; pSchedMsg->msg = code; + if (tsem_wait(&clientQueryQueue.queueSem) != 0) { + qFatal("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + } - return taosWriteQitem(pTaskQueue, pSchedMsg); + return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg); } void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {