client use new thread pool

This commit is contained in:
54liuyao 2024-06-26 15:56:23 +08:00 committed by wangjiaming0909
parent 938dc06a09
commit 03a7b43af6
1 changed files with 26 additions and 16 deletions

View File

@ -19,10 +19,13 @@
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
#include "tsched.h" #include "tsched.h"
#include "tworker.h"
// clang-format off // clang-format off
#include "cJSON.h" #include "cJSON.h"
#include "queryInt.h" #include "queryInt.h"
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pMsg);
int32_t getAsofJoinReverseOp(EOperatorType op) { int32_t getAsofJoinReverseOp(EOperatorType op) {
switch (op) { switch (op) {
case OP_TYPE_GREATER_THAN: case OP_TYPE_GREATER_THAN:
@ -118,12 +121,21 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
return true; return true;
} }
static SSchedQueue pTaskQueue = {0}; static STaosQueue* pTaskQueue = NULL;
static SQueryAutoQWorkerPool taskThreadPool = {0};
int32_t initTaskQueue() { int32_t initTaskQueue() {
int32_t queueSize = tsMaxShellConns * 2; taskThreadPool.name = "tsc";
void *p = taosInitScheduler(queueSize, tsNumOfTaskQueueThreads, "tsc", &pTaskQueue); taskThreadPool.min = tsNumOfTaskQueueThreads;
if (NULL == p) { taskThreadPool.max = tsNumOfTaskQueueThreads;
int32_t coce = tQueryAutoQWorkerInit(&taskThreadPool);
if (TSDB_CODE_SUCCESS != coce) {
qError("failed to init task thread pool");
return -1;
}
pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskThreadPool, NULL, (FItem)processTaskQueue);
if (NULL == pTaskQueue) {
qError("failed to init task queue"); qError("failed to init task queue");
return -1; return -1;
} }
@ -133,26 +145,24 @@ int32_t initTaskQueue() {
} }
int32_t cleanupTaskQueue() { int32_t cleanupTaskQueue() {
taosCleanUpScheduler(&pTaskQueue); tQueryAutoQWorkerCleanup(&taskThreadPool);
return 0; return 0;
} }
static void execHelper(struct SSchedMsg* pSchedMsg) { static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
int32_t code = execFn(pSchedMsg->thandle); execFn(pSchedMsg->thandle);
if (code != 0 && pSchedMsg->msg != NULL) { taosFreeQitem(pSchedMsg);
*(int32_t*)pSchedMsg->msg = code;
}
} }
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
SSchedMsg schedMsg = {0}; SSchedMsg* pSchedMsg = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0);
schedMsg.fp = execHelper; pSchedMsg->fp = NULL;
schedMsg.ahandle = execFn; pSchedMsg->ahandle = execFn;
schedMsg.thandle = execParam; pSchedMsg->thandle = execParam;
schedMsg.msg = code; pSchedMsg->msg = code;
return taosScheduleTask(&pTaskQueue, &schedMsg); return taosWriteQitem(pTaskQueue, pSchedMsg);
} }
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {