fix:add limit for stream batch
This commit is contained in:
parent
4353e70c89
commit
2270fed9f7
|
@ -15,6 +15,8 @@
|
||||||
|
|
||||||
#include "streamInc.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
|
#define STREAM_EXEC_MAX_BATCH_NUM 100
|
||||||
|
|
||||||
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
|
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
void* exec = pTask->exec.executor;
|
void* exec = pTask->exec.executor;
|
||||||
|
@ -221,6 +223,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
batchCnt++;
|
batchCnt++;
|
||||||
input = newRet;
|
input = newRet;
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
|
if (batchCnt > STREAM_EXEC_MAX_BATCH_NUM) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue