From 03a7b43af653e371b33f1171d5e10701975dc0f5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 26 Jun 2024 15:56:23 +0800 Subject: [PATCH] client use new thread pool --- source/libs/qcom/src/queryUtil.c | 42 ++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 4206c9292f..ab536e9e62 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -19,10 +19,13 @@ #include "tmsg.h" #include "trpc.h" #include "tsched.h" +#include "tworker.h" // clang-format off #include "cJSON.h" #include "queryInt.h" +static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pMsg); + int32_t getAsofJoinReverseOp(EOperatorType op) { switch (op) { case OP_TYPE_GREATER_THAN: @@ -118,12 +121,21 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag return true; } -static SSchedQueue pTaskQueue = {0}; +static STaosQueue* pTaskQueue = NULL; +static SQueryAutoQWorkerPool taskThreadPool = {0}; int32_t initTaskQueue() { - int32_t queueSize = tsMaxShellConns * 2; - void *p = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc", &pTaskQueue); - if (NULL == p) { + taskThreadPool.name = "tsc"; + taskThreadPool.min = tsNumOfTaskQueueThreads; + taskThreadPool.max = tsNumOfTaskQueueThreads; + int32_t coce = tQueryAutoQWorkerInit(&taskThreadPool); + if (TSDB_CODE_SUCCESS != coce) { + qError("failed to init task thread pool"); + return -1; + } + + pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskThreadPool, NULL, (FItem)processTaskQueue); + if (NULL == pTaskQueue) { qError("failed to init task queue"); return -1; } @@ -133,26 +145,24 @@ int32_t initTaskQueue() { } int32_t cleanupTaskQueue() { - taosCleanUpScheduler(&pTaskQueue); + tQueryAutoQWorkerCleanup(&taskThreadPool); return 0; } -static void execHelper(struct SSchedMsg* pSchedMsg) { +static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; - int32_t code = execFn(pSchedMsg->thandle); - if (code != 0 && pSchedMsg->msg != NULL) { - *(int32_t*)pSchedMsg->msg = code; - } + execFn(pSchedMsg->thandle); + taosFreeQitem(pSchedMsg); } int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { - SSchedMsg schedMsg = {0}; - schedMsg.fp = execHelper; - schedMsg.ahandle = execFn; - schedMsg.thandle = execParam; - schedMsg.msg = code; + SSchedMsg* pSchedMsg = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0); + pSchedMsg->fp = NULL; + pSchedMsg->ahandle = execFn; + pSchedMsg->thandle = execParam; + pSchedMsg->msg = code; - return taosScheduleTask(&pTaskQueue, &schedMsg); + return taosWriteQitem(pTaskQueue, pSchedMsg); } void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {