diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 2040f8e323..6448e9d2f7 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -17,6 +17,7 @@ #include "vnd.h" #define MAX_REPEAT_SCAN_THRESHOLD 3 +#define SCAN_WAL_IDLE_DURATION 100 static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); @@ -36,12 +37,10 @@ int32_t tqScanWal(STQ* pTq) { bool shouldIdle = true; doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle); - int32_t times = 0; - if (shouldIdle) { taosWLockLatch(&pMeta->lock); - times = (--pMeta->walScanCounter); + int32_t times = (--pMeta->walScanCounter); ASSERT(pMeta->walScanCounter >= 0); if (pMeta->walScanCounter <= 0) { @@ -50,7 +49,8 @@ int32_t tqScanWal(STQ* pTq) { } taosWUnLockLatch(&pMeta->lock); - tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times); + tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION); + taosMsleep(SCAN_WAL_IDLE_DURATION); } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 7f7c039423..887220f840 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -16,7 +16,8 @@ #include "streamInt.h" #define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec +#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec +#define WAIT_FOR_DURATION 40 // todo refactor: // read data from input queue @@ -172,7 +173,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) { if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { - taosMsleep(40); + taosMsleep(WAIT_FOR_DURATION); continue; }