fix(stream): do filter before the update check.

This commit is contained in:
Haojun Liao 2023-07-26 16:34:39 +08:00
parent df57768a3e
commit 432c943c25
2 changed files with 9 additions and 8 deletions

View File

@ -1322,13 +1322,15 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
return -1;
}
tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x", pTask->id.idStr, req.downstreamId);
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
if (remain > 0) {
tqDebug("s-task:%s remain:%d not send finish rsp", pTask->id.idStr, remain);
tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, remain:%d not send finish rsp",
pTask->id.idStr, req.downstreamId, remain);
} else {
tqDebug("s-task:%s all downstream tasks rsp scan-history completed msg", pTask->id.idStr);
tqDebug(
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg",
pTask->id.idStr);
streamProcessScanHistoryFinishRsp(pTask);
}

View File

@ -2155,15 +2155,14 @@ FETCH_NEXT_BLOCK:
return pInfo->pCreateTbRes;
}
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
// apply additional time window filter
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) {