limit the size of query queue

This commit is contained in:
54liuyao 2024-06-26 17:27:35 +08:00 committed by wangjiaming0909
parent 0383257900
commit 8fdd64e92e
2 changed files with 20 additions and 2 deletions

View File

@ -297,6 +297,8 @@ int32_t cleanupTaskQueue();
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
int32_t taosAsyncWait();
int32_t taosAsyncRecover();
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);

View File

@ -132,7 +132,7 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
execFn(pSchedMsg->thandle);
taosFreeQitem(pSchedMsg);
if (tsem_post(&clientQueryQueue.queueSem) != 0) {
uFatal("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
qError("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
}
}
@ -176,12 +176,28 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
pSchedMsg->thandle = execParam;
pSchedMsg->msg = code;
if (tsem_wait(&clientQueryQueue.queueSem) != 0) {
qFatal("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
qError("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
}
return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg);
}
int32_t taosAsyncWait() {
if (!clientQueryQueue.taskThreadPool.pCb) {
qError("query task thread pool callback function is null");
return -1;
}
return clientQueryQueue.taskThreadPool.pCb->beforeBlocking(&clientQueryQueue.taskThreadPool);
}
int32_t taosAsyncRecover() {
if (!clientQueryQueue.taskThreadPool.pCb) {
qError("query task thread pool callback function is null");
return -1;
}
return clientQueryQueue.taskThreadPool.pCb->afterRecoverFromBlocking(&clientQueryQueue.taskThreadPool);
}
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
if (NULL == pMsgBody) {
return;