refactor: wait for 100ms before scan wal again.
This commit is contained in:
parent
932d93ab35
commit
d2160efa05
|
@ -17,6 +17,7 @@
|
|||
#include "vnd.h"
|
||||
|
||||
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
||||
#define SCAN_WAL_IDLE_DURATION 100
|
||||
|
||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||
|
@ -36,12 +37,10 @@ int32_t tqScanWal(STQ* pTq) {
|
|||
bool shouldIdle = true;
|
||||
doScanWalForAllTasks(pTq->pStreamMeta, &shouldIdle);
|
||||
|
||||
int32_t times = 0;
|
||||
|
||||
if (shouldIdle) {
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
||||
times = (--pMeta->walScanCounter);
|
||||
int32_t times = (--pMeta->walScanCounter);
|
||||
ASSERT(pMeta->walScanCounter >= 0);
|
||||
|
||||
if (pMeta->walScanCounter <= 0) {
|
||||
|
@ -50,7 +49,8 @@ int32_t tqScanWal(STQ* pTq) {
|
|||
}
|
||||
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
tqDebug("vgId:%d scan wal for stream tasks for %d times", vgId, times);
|
||||
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
|
||||
taosMsleep(SCAN_WAL_IDLE_DURATION);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,7 +16,8 @@
|
|||
#include "streamInt.h"
|
||||
|
||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||
#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec
|
||||
#define MAX_SMOOTH_BURST_RATIO 5 // 20 sec
|
||||
#define WAIT_FOR_DURATION 40
|
||||
|
||||
// todo refactor:
|
||||
// read data from input queue
|
||||
|
@ -172,7 +173,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
|
|||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
|
||||
if (qItem == NULL) {
|
||||
if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
|
||||
taosMsleep(40);
|
||||
taosMsleep(WAIT_FOR_DURATION);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue