This commit is contained in:
54liuyao 2024-07-01 10:12:13 +08:00 committed by wangjiaming0909
parent 6c6f322a83
commit c3deebf5b6
1 changed files with 15 additions and 15 deletions

View File

@ -24,10 +24,10 @@
#include "cJSON.h"
#include "queryInt.h"
typedef struct SQueryQueue {
typedef struct STaskQueue {
SQueryAutoQWorkerPool wrokrerPool;
STaosQueue* pTaskQueue;
} SQueryQueue;
} STaskQueue;
int32_t getAsofJoinReverseOp(EOperatorType op) {
switch (op) {
@ -124,7 +124,7 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
return true;
}
static SQueryQueue clientTaskQueue = {0};
static STaskQueue taskQueue = {0};
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
@ -133,17 +133,17 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
}
int32_t initTaskQueue() {
clientTaskQueue.wrokrerPool.name = "tsc";
clientTaskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
clientTaskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
int32_t coce = tQueryAutoQWorkerInit(&clientTaskQueue.wrokrerPool);
taskQueue.wrokrerPool.name = "tsc";
taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
int32_t coce = tQueryAutoQWorkerInit(&taskQueue.wrokrerPool);
if (TSDB_CODE_SUCCESS != coce) {
qError("failed to init task thread pool");
return -1;
}
clientTaskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&clientTaskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
if (NULL == clientTaskQueue.pTaskQueue) {
taskQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskQueue.wrokrerPool, NULL, (FItem)processTaskQueue);
if (NULL == taskQueue.pTaskQueue) {
qError("failed to init task queue");
return -1;
}
@ -153,7 +153,7 @@ int32_t initTaskQueue() {
}
int32_t cleanupTaskQueue() {
tQueryAutoQWorkerCleanup(&clientTaskQueue.wrokrerPool);
tQueryAutoQWorkerCleanup(&taskQueue.wrokrerPool);
return 0;
}
@ -164,23 +164,23 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
pSchedMsg->thandle = execParam;
pSchedMsg->msg = code;
return taosWriteQitem(clientTaskQueue.pTaskQueue, pSchedMsg);
return taosWriteQitem(taskQueue.pTaskQueue, pSchedMsg);
}
int32_t taosAsyncWait() {
if (!clientTaskQueue.wrokrerPool.pCb) {
if (!taskQueue.wrokrerPool.pCb) {
qError("query task thread pool callback function is null");
return -1;
}
return clientTaskQueue.wrokrerPool.pCb->beforeBlocking(&clientTaskQueue.wrokrerPool);
return taskQueue.wrokrerPool.pCb->beforeBlocking(&taskQueue.wrokrerPool);
}
int32_t taosAsyncRecover() {
if (!clientTaskQueue.wrokrerPool.pCb) {
if (!taskQueue.wrokrerPool.pCb) {
qError("query task thread pool callback function is null");
return -1;
}
return clientTaskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&clientTaskQueue.wrokrerPool);
return taskQueue.wrokrerPool.pCb->afterRecoverFromBlocking(&taskQueue.wrokrerPool);
}
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {