mid pull over
This commit is contained in:
parent
5473c22585
commit
b66260d4e9
|
@ -561,6 +561,7 @@ typedef struct SStreamIntervalOperatorInfo {
|
||||||
SSDataBlock* pMidRetriveRes;
|
SSDataBlock* pMidRetriveRes;
|
||||||
bool recvPullover;
|
bool recvPullover;
|
||||||
SSDataBlock* pMidPulloverRes;
|
SSDataBlock* pMidPulloverRes;
|
||||||
|
bool clearState;
|
||||||
} SStreamIntervalOperatorInfo;
|
} SStreamIntervalOperatorInfo;
|
||||||
|
|
||||||
typedef struct SDataGroupInfo {
|
typedef struct SDataGroupInfo {
|
||||||
|
|
|
@ -1566,6 +1566,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
pInfo->recvRetrive = false;
|
pInfo->recvRetrive = false;
|
||||||
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
|
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
|
||||||
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
|
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
|
||||||
|
pInfo->clearState = false;
|
||||||
|
|
||||||
pOperator->operatorType = pPhyNode->type;
|
pOperator->operatorType = pPhyNode->type;
|
||||||
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
|
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
|
||||||
|
@ -4287,6 +4288,11 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
|
if (pInfo->clearState) {
|
||||||
|
pInfo->clearState = false;
|
||||||
|
clearFunctionContext(&pOperator->exprSupp);
|
||||||
|
clearStreamIntervalOperator(pInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pInfo->pUpdated) {
|
if (!pInfo->pUpdated) {
|
||||||
|
@ -4341,6 +4347,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
} else {
|
} else {
|
||||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||||
}
|
}
|
||||||
|
pInfo->clearState = true;
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -4351,6 +4358,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
pInfo->pPullWins, pInfo->numOfChild, pOperator);
|
pInfo->pPullWins, pInfo->numOfChild, pOperator);
|
||||||
if (pInfo->recvPullover) {
|
if (pInfo->recvPullover) {
|
||||||
copyDataBlock(pInfo->pMidPulloverRes, pBlock);
|
copyDataBlock(pInfo->pMidPulloverRes, pBlock);
|
||||||
|
pInfo->clearState = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
|
Loading…
Reference in New Issue