fix(stream): limit the batch size.
This commit is contained in:
parent
a76e47ef54
commit
b2d141fe40
|
@ -15,7 +15,8 @@
|
||||||
|
|
||||||
#include "streamInc.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
#define MAX_STREAM_EXEC_BATCH_NUM 10240
|
// maximum allowed processed block batches. One block may include several submit blocks
|
||||||
|
#define MAX_STREAM_EXEC_BATCH_NUM 128
|
||||||
#define MIN_STREAM_EXEC_BATCH_NUM 16
|
#define MIN_STREAM_EXEC_BATCH_NUM 16
|
||||||
|
|
||||||
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
|
bool streamTaskShouldStop(const SStreamStatus* pStatus) {
|
||||||
|
@ -297,6 +298,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
pInput = newRet;
|
pInput = newRet;
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
|
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
|
||||||
|
qDebug("maximum batch limit:%d reached, processing, %s", MAX_STREAM_EXEC_BATCH_NUM, pTask->id.idStr);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue