refactor: limit the number of items in inputq to be 5120
This commit is contained in:
parent
06f91b64e6
commit
cea647daf6
|
@ -37,7 +37,7 @@ extern "C" {
|
||||||
#define META_HB_CHECK_INTERVAL 200
|
#define META_HB_CHECK_INTERVAL 200
|
||||||
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
|
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
|
||||||
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
|
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
|
||||||
#define STREAM_TASK_QUEUE_CAPACITY 20480
|
#define STREAM_TASK_QUEUE_CAPACITY 5120
|
||||||
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
|
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
|
|
|
@ -225,7 +225,8 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->status.latestForceWindow = w;
|
pTask->status.latestForceWindow = w;
|
||||||
if (w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) {
|
if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) ||
|
||||||
|
streamQueueIsFull(pTask->inputq.queue)) {
|
||||||
int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI);
|
||||||
|
|
||||||
*pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now;
|
*pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now;
|
||||||
|
|
Loading…
Reference in New Issue