From 5aff5aa93b586838f7b497405b82c307ee5b673a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 4 Apr 2023 12:38:56 +0000 Subject: [PATCH] add backpressure --- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/util/src/terror.c | 2 +- source/util/src/tqueue.c | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b7dceffb15..a4d55010fa 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -934,8 +934,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->refCnt = 1; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->inputQueue = streamQueueOpen(128 << 10); - pTask->outputQueue = streamQueueOpen(128 << 10); + pTask->inputQueue = streamQueueOpen(512 << 10); + pTask->outputQueue = streamQueueOpen(512 << 10); if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { return -1; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6ea3349fe4..fb8493dfa4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -640,7 +640,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open // UTIL -TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory") +TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory") #ifdef TAOS_ERROR_C }; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 1b0b2f63f1..e2f1d7c033 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -169,7 +169,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { taosThreadMutexUnlock(&queue->mutex); return code; - } else if (queue->itemLimit > 0 && queue->itemLimit + 1 > queue->itemLimit) { + } else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) { code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, queue->itemLimit, tstrerror(code));