limit the size of query queue

This commit is contained in:
54liuyao 2024-06-26 17:03:40 +08:00 committed by wangjiaming0909
parent 03a7b43af6
commit 0383257900
1 changed files with 34 additions and 17 deletions

View File

@ -24,7 +24,11 @@
#include "cJSON.h" #include "cJSON.h"
#include "queryInt.h" #include "queryInt.h"
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pMsg); typedef struct SQueryQueue {
SQueryAutoQWorkerPool taskThreadPool;
STaosQueue* pTaskQueue;
tsem_t queueSem;
} SQueryQueue;
int32_t getAsofJoinReverseOp(EOperatorType op) { int32_t getAsofJoinReverseOp(EOperatorType op) {
switch (op) { switch (op) {
@ -121,48 +125,61 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTag
return true; return true;
} }
static STaosQueue* pTaskQueue = NULL; static SQueryQueue clientQueryQueue = {0};
static SQueryAutoQWorkerPool taskThreadPool = {0};
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
execFn(pSchedMsg->thandle);
taosFreeQitem(pSchedMsg);
if (tsem_post(&clientQueryQueue.queueSem) != 0) {
uFatal("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
}
}
int32_t initTaskQueue() { int32_t initTaskQueue() {
taskThreadPool.name = "tsc"; uint32_t queueSize = (uint32_t)(tsMaxShellConns * 2);
taskThreadPool.min = tsNumOfTaskQueueThreads; clientQueryQueue.taskThreadPool.name = "tsc";
taskThreadPool.max = tsNumOfTaskQueueThreads; clientQueryQueue.taskThreadPool.min = tsNumOfTaskQueueThreads;
int32_t coce = tQueryAutoQWorkerInit(&taskThreadPool); clientQueryQueue.taskThreadPool.max = tsNumOfTaskQueueThreads;
int32_t coce = tQueryAutoQWorkerInit(&clientQueryQueue.taskThreadPool);
if (TSDB_CODE_SUCCESS != coce) { if (TSDB_CODE_SUCCESS != coce) {
qError("failed to init task thread pool"); qError("failed to init task thread pool");
return -1; return -1;
} }
pTaskQueue = tQueryAutoQWorkerAllocQueue(&taskThreadPool, NULL, (FItem)processTaskQueue); clientQueryQueue.pTaskQueue = tQueryAutoQWorkerAllocQueue(&clientQueryQueue.taskThreadPool, NULL, (FItem)processTaskQueue);
if (NULL == pTaskQueue) { if (NULL == clientQueryQueue.pTaskQueue) {
qError("failed to init task queue"); qError("failed to init task queue");
return -1; return -1;
} }
if (tsem_init(&clientQueryQueue.queueSem, 0, queueSize) != 0) {
uError("init %s:queue semaphore failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
cleanupTaskQueue();
return -1;
}
qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads); qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
return 0; return 0;
} }
int32_t cleanupTaskQueue() { int32_t cleanupTaskQueue() {
tQueryAutoQWorkerCleanup(&taskThreadPool); tsem_destroy(&clientQueryQueue.queueSem);
tQueryAutoQWorkerCleanup(&clientQueryQueue.taskThreadPool);
return 0; return 0;
} }
static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
execFn(pSchedMsg->thandle);
taosFreeQitem(pSchedMsg);
}
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) { int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) {
SSchedMsg* pSchedMsg = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0); SSchedMsg* pSchedMsg = taosAllocateQitem(sizeof(SSchedMsg), DEF_QITEM, 0);
pSchedMsg->fp = NULL; pSchedMsg->fp = NULL;
pSchedMsg->ahandle = execFn; pSchedMsg->ahandle = execFn;
pSchedMsg->thandle = execParam; pSchedMsg->thandle = execParam;
pSchedMsg->msg = code; pSchedMsg->msg = code;
if (tsem_wait(&clientQueryQueue.queueSem) != 0) {
qFatal("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
}
return taosWriteQitem(pTaskQueue, pSchedMsg); return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg);
} }
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {