From 132fb8e82ee1566ebfd3c3384072024a7296dc3f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Oct 2023 15:42:00 +0800 Subject: [PATCH] refactor: wait for more time for future data --- source/dnode/mnode/impl/src/mndStream.c | 2 +- source/libs/stream/src/streamExec.c | 1 - source/libs/stream/src/streamQueue.c | 3 +-- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 866c1dfca4..de4f6e85a3 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1589,7 +1589,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // input queue char vbuf[30] = {0}; char buf[25] = {0}; - const char* queueInfoStr = "%5.2fMiB(%5.2f%)"; + const char* queueInfoStr = "%4.2fMiB(%5.2f%)"; sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate); STR_TO_VARSTR(vbuf, buf); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b5ea82d347..dbceb83803 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -17,7 +17,6 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 4 #define STREAM_RESULT_DUMP_THRESHOLD 100 #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 8a7827b5a7..7f7c039423 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -16,7 +16,6 @@ #include "streamInt.h" #define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 4 #define MAX_SMOOTH_BURST_RATIO 5 // 20 sec // todo refactor: @@ -173,7 +172,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) { if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { - taosMsleep(10); + taosMsleep(40); continue; }