fix(stream): fix invalid check.

This commit is contained in:
Haojun Liao 2023-10-03 00:38:24 +08:00
parent 5718158fc0
commit 5564eb215c
2 changed files with 1 additions and 2 deletions

View File

@ -370,7 +370,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
if (streamQueueGetNumOfItems(pTask->inputInfo.queue)) {
if (streamQueueIsFull(pTask->inputInfo.queue, true)) {
tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;

View File

@ -1128,7 +1128,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// all msg rsp already, continue
if (leftRsp == 0) {
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
stDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status);
// we need to re-try send dispatch msg to downstream tasks
int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList);