mid pull over

This commit is contained in:
54liuyao 2024-02-07 13:55:01 +08:00
parent b66260d4e9
commit 6e19f452ce
1 changed files with 14 additions and 6 deletions

View File

@ -4283,10 +4283,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType)); qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType));
return NULL; return NULL;
} else { } else {
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (pInfo->pDelRes->info.rows != 0) { if (resBlock != NULL) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return resBlock;
return pInfo->pDelRes;
} }
if (pInfo->clearState) { if (pInfo->clearState) {
pInfo->clearState = false; pInfo->clearState = false;
@ -4347,7 +4346,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
} else { } else {
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
} }
pInfo->clearState = true; ASSERT(taosArrayGetSize(pInfo->pUpdated) == 0);
return pInfo->pDelRes; return pInfo->pDelRes;
} }
continue; continue;
@ -4405,7 +4404,16 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
return buildIntervalResult(pOperator); SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->clearState) {
pInfo->clearState = false;
clearFunctionContext(&pOperator->exprSupp);
clearStreamIntervalOperator(pInfo);
}
return NULL;
} }
void setStreamOperatorCompleted(SOperatorInfo* pOperator) { void setStreamOperatorCompleted(SOperatorInfo* pOperator) {