This commit is contained in:
54liuyao 2024-07-01 09:49:12 +08:00 committed by wangjiaming0909
parent dc0224cb2c
commit 26adfdd48d
1 changed files with 14 additions and 14 deletions

View File

@ -25,7 +25,7 @@
#include "queryInt.h" #include "queryInt.h"
typedef struct SQueryQueue { typedef struct SQueryQueue {
SQueryAutoQWorkerPool taskThreadPool; SQueryAutoQWorkerPool wrokrerPool;
STaosQueue* pTaskQueue; STaosQueue* pTaskQueue;
} SQueryQueue; } SQueryQueue;
@ -124,7 +124,7 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
return true; return true;
} }
static SQueryQueue clientQueryQueue = {0}; static SQueryQueue clientTaskQueue = {0};
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
@ -133,17 +133,17 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
} }
int32_t initTaskQueue() { int32_t initTaskQueue() {
clientQueryQueue.taskThreadPool.name = "tsc"; clientTaskQueue.wrokrerPool.name = "tsc";
clientQueryQueue.taskThreadPool.min = tsNumOfTaskQueueThreads; clientTaskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
clientQueryQueue.taskThreadPool.max = tsNumOfTaskQueueThreads; clientTaskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
int32_t coce = tQueryAutoQWorkerInit(&clientQueryQueue.taskThreadPool); int32_t coce = tQueryAutoQWorkerInit(&clientTaskQueue.wrokrerPool);
if (TSDB_CODE_SUCCESS != coce) { if (TSDB_CODE_SUCCESS != coce) {
qError("failed to init task thread pool"); qError("failed to init task thread pool");
return -1; return -1;
} }
clientQueryQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&clientQueryQueue.taskThreadPool, NULL, (FItem)processTaskQueue); clientTaskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&clientTaskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
if (NULL == clientQueryQueue.pTaskQueue) { if (NULL == clientTaskQueue.pTaskQueue) {
qError("failed to init task queue"); qError("failed to init task queue");
return -1; return -1;
} }
@ -153,7 +153,7 @@ int32_t initTaskQueue() {
} }
int32_t cleanupTaskQueue() { int32_t cleanupTaskQueue() {
tQueryAutoQWorkerCleanup(&clientQueryQueue.taskThreadPool); tQueryAutoQWorkerCleanup(&clientTaskQueue.wrokrerPool);
return 0; return 0;
} }
@ -164,23 +164,23 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
pSchedMsg->thandle = execParam; pSchedMsg->thandle = execParam;
pSchedMsg->msg = code; pSchedMsg->msg = code;
return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg); return taosWriteQitem(clientTaskQueue.pTaskQueue, pSchedMsg);
} }
int32_t taosAsyncWait() { int32_t taosAsyncWait() {
if (!clientQueryQueue.taskThreadPool.pCb) { if (!clientTaskQueue.wrokrerPool.pCb) {
qError("query task thread pool callback function is null"); qError("query task thread pool callback function is null");
return -1; return -1;
} }
return clientQueryQueue.taskThreadPool.pCb->beforeBlocking(&clientQueryQueue.taskThreadPool); return clientTaskQueue.wrokrerPool.pCb->beforeBlocking(&clientTaskQueue.wrokrerPool);
} }
int32_t taosAsyncRecover() { int32_t taosAsyncRecover() {
if (!clientQueryQueue.taskThreadPool.pCb) { if (!clientTaskQueue.wrokrerPool.pCb) {
qError("query task thread pool callback function is null"); qError("query task thread pool callback function is null");
return -1; return -1;
} }
return clientQueryQueue.taskThreadPool.pCb->afterRecoverFromBlocking(&clientQueryQueue.taskThreadPool); return clientTaskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&clientTaskQueue.wrokrerPool);
} }
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {