fix(stream): add the back pressure for sink node.

This commit is contained in:
Haojun Liao 2023-08-16 19:11:43 +08:00
parent ec84d7fd81
commit 9a76ae72d6
2 changed files with 13 additions and 0 deletions

View File

@ -287,6 +287,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue; continue;
} }
// downstream task has blocked the output, stopped for a while
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
*pScanIdle = false; *pScanIdle = false;
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL

View File

@ -191,6 +191,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
return 0; return 0;
} }
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
qDebug("s-task:%s inputQ is blocked, wait for 5sec and retry", pTask->id.idStr);
taosMsleep(10000);
continue;
}
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
code = qExecTask(exec, &output, &ts); code = qExecTask(exec, &output, &ts);