drop limit

This commit is contained in:
54liuyao 2024-06-27 10:59:02 +08:00 committed by wangjiaming0909
parent a25571313f
commit 55341586b5
1 changed files with 0 additions and 15 deletions

View File

@ -27,7 +27,6 @@
typedef struct SQueryQueue { typedef struct SQueryQueue {
SQueryAutoQWorkerPool taskThreadPool; SQueryAutoQWorkerPool taskThreadPool;
STaosQueue* pTaskQueue; STaosQueue* pTaskQueue;
tsem_t queueSem;
} SQueryQueue; } SQueryQueue;
int32_t getAsofJoinReverseOp(EOperatorType op) { int32_t getAsofJoinReverseOp(EOperatorType op) {
@ -131,13 +130,9 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
__async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle; __async_exec_fn_t execFn = (__async_exec_fn_t)pSchedMsg->ahandle;
execFn(pSchedMsg->thandle); execFn(pSchedMsg->thandle);
taosFreeQitem(pSchedMsg); taosFreeQitem(pSchedMsg);
if (tsem_post(&clientQueryQueue.queueSem) != 0) {
qError("post %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
}
} }
int32_t initTaskQueue() { int32_t initTaskQueue() {
uint32_t queueSize = (uint32_t)(tsMaxShellConns * 2);
clientQueryQueue.taskThreadPool.name = "tsc"; clientQueryQueue.taskThreadPool.name = "tsc";
clientQueryQueue.taskThreadPool.min = tsNumOfTaskQueueThreads; clientQueryQueue.taskThreadPool.min = tsNumOfTaskQueueThreads;
clientQueryQueue.taskThreadPool.max = tsNumOfTaskQueueThreads; clientQueryQueue.taskThreadPool.max = tsNumOfTaskQueueThreads;
@ -153,18 +148,11 @@ int32_t initTaskQueue() {
return -1; return -1;
} }
if (tsem_init(&clientQueryQueue.queueSem, 0, queueSize) != 0) {
qError("init %s:queue semaphore failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
cleanupTaskQueue();
return -1;
}
qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads); qDebug("task queue is initialized, numOfThreads: %d", tsNumOfTaskQueueThreads);
return 0; return 0;
} }
int32_t cleanupTaskQueue() { int32_t cleanupTaskQueue() {
tsem_destroy(&clientQueryQueue.queueSem);
tQueryAutoQWorkerCleanup(&clientQueryQueue.taskThreadPool); tQueryAutoQWorkerCleanup(&clientQueryQueue.taskThreadPool);
return 0; return 0;
} }
@ -175,9 +163,6 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
pSchedMsg->ahandle = execFn; pSchedMsg->ahandle = execFn;
pSchedMsg->thandle = execParam; pSchedMsg->thandle = execParam;
pSchedMsg->msg = code; pSchedMsg->msg = code;
if (tsem_wait(&clientQueryQueue.queueSem) != 0) {
qError("wait %s emptySem failed(%s)", clientQueryQueue.taskThreadPool.name, strerror(errno));
}
return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg); return taosWriteQitem(clientQueryQueue.pTaskQueue, pSchedMsg);
} }