Merge pull request #22465 from taosdata/fix/3_liaohj
fix(stream): add the back pressure for sink node.
This commit is contained in:
commit
f8f8e7a8f0
|
@ -287,6 +287,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
// downstream task has blocked the output, stopped for a while
|
||||
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
continue;
|
||||
}
|
||||
|
||||
*pScanIdle = false;
|
||||
|
||||
// seek the stored version and extract data from WAL
|
||||
|
|
|
@ -362,7 +362,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
msgLen, ver, total, size + msgLen/1048576.0);
|
||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
|
||||
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
|
||||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total,
|
||||
size);
|
||||
|
|
|
@ -191,6 +191,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
qDebug("s-task:%s inputQ is blocked, wait for 10sec and retry", pTask->id.idStr);
|
||||
taosMsleep(10000);
|
||||
continue;
|
||||
}
|
||||
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
code = qExecTask(exec, &output, &ts);
|
||||
|
|
Loading…
Reference in New Issue