refactor: wait for more time for future data
This commit is contained in:
parent
38aff641ee
commit
132fb8e82e
|
@ -1589,7 +1589,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
// input queue
|
// input queue
|
||||||
char vbuf[30] = {0};
|
char vbuf[30] = {0};
|
||||||
char buf[25] = {0};
|
char buf[25] = {0};
|
||||||
const char* queueInfoStr = "%5.2fMiB(%5.2f%)";
|
const char* queueInfoStr = "%4.2fMiB(%5.2f%)";
|
||||||
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
|
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
|
||||||
STR_TO_VARSTR(vbuf, buf);
|
STR_TO_VARSTR(vbuf, buf);
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
|
|
||||||
// maximum allowed processed block batches. One block may include several submit blocks
|
// maximum allowed processed block batches. One block may include several submit blocks
|
||||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||||
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
|
||||||
#define STREAM_RESULT_DUMP_THRESHOLD 100
|
#define STREAM_RESULT_DUMP_THRESHOLD 100
|
||||||
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)
|
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1)
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
|
|
||||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||||
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
|
||||||
#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec
|
#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec
|
||||||
|
|
||||||
// todo refactor:
|
// todo refactor:
|
||||||
|
@ -173,7 +172,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
||||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
|
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
|
||||||
if (qItem == NULL) {
|
if (qItem == NULL) {
|
||||||
if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
|
if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
|
||||||
taosMsleep(10);
|
taosMsleep(40);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue