From 8fdd64e92ebc9227976f672be70d3fe78fee1bd7 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 26 Jun 2024 17:27:35 +0800 Subject: [PATCH] limit the size of query queue --- include/libs/qcom/query.h | 2 ++ source/libs/qcom/src/queryUtil.c | 20 ++++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index ef702f24d7..2078455f1d 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -297,6 +297,8 @@ int32_t cleanupTaskQueue(); * @return */ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); +int32_t taosAsyncWait(); +int32_t taosAsyncRecover(); void destroySendMsgInfo(SMsgSendInfo* pMsgBody); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index e454ea3c01..98e1034ad7 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -132,7 +132,7 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) { execFn(pSchedMsg->thandle); taosFreeQitem(pSchedMsg); if (tsem_post(&clientQueryQueue.queueSem) != 0) { - uFatal("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + qError("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); } } @@ -176,12 +176,28 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code) pSchedMsg->thandle = execParam; pSchedMsg->msg = code; if (tsem_wait(&clientQueryQueue.queueSem) != 0) { - qFatal("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); + qError("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno)); } return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg); } +int32_t taosAsyncWait() { + if (!clientQueryQueue.taskThreadPool.pCb) { + qError("query task thread pool callback function is null"); + return -1; + } + return clientQueryQueue.taskThreadPool.pCb->beforeBlocking(&clientQueryQueue.taskThreadPool); +} + +int32_t taosAsyncRecover() { + if (!clientQueryQueue.taskThreadPool.pCb) { + qError("query task thread pool callback function is null"); + return -1; + } + return clientQueryQueue.taskThreadPool.pCb->afterRecoverFromBlocking(&clientQueryQueue.taskThreadPool); +} + void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { if (NULL == pMsgBody) { return;