add backpressure
This commit is contained in:
parent
1892b806b5
commit
5aff5aa93b
|
@ -934,8 +934,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->refCnt = 1;
|
pTask->refCnt = 1;
|
||||||
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||||
|
|
||||||
pTask->inputQueue = streamQueueOpen(128 << 10);
|
pTask->inputQueue = streamQueueOpen(512 << 10);
|
||||||
pTask->outputQueue = streamQueueOpen(128 << 10);
|
pTask->outputQueue = streamQueueOpen(512 << 10);
|
||||||
|
|
||||||
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
|
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -640,7 +640,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open
|
||||||
|
|
||||||
// UTIL
|
// UTIL
|
||||||
|
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory")
|
||||||
|
|
||||||
#ifdef TAOS_ERROR_C
|
#ifdef TAOS_ERROR_C
|
||||||
};
|
};
|
||||||
|
|
|
@ -169,7 +169,7 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) {
|
||||||
|
|
||||||
taosThreadMutexUnlock(&queue->mutex);
|
taosThreadMutexUnlock(&queue->mutex);
|
||||||
return code;
|
return code;
|
||||||
} else if (queue->itemLimit > 0 && queue->itemLimit + 1 > queue->itemLimit) {
|
} else if (queue->itemLimit > 0 && queue->numOfItems + 1 > queue->itemLimit) {
|
||||||
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
|
code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY;
|
||||||
uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
|
uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue,
|
||||||
queue->itemLimit, tstrerror(code));
|
queue->itemLimit, tstrerror(code));
|
||||||
|
|
Loading…
Reference in New Issue