fix process next group bug

This commit is contained in:
Ganlin Zhao 2023-04-26 18:07:19 +08:00
parent fc6ca69afa
commit 75f76a62ab
1 changed files with 10 additions and 9 deletions

View File

@ -473,7 +473,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
@ -486,7 +485,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
} else if (ts < pSliceInfo->current) {
@ -510,7 +508,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
} else {
@ -544,14 +541,13 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
doKeepPrevRows(pSliceInfo, pBlock, i);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
setOperatorCompleted(pOperator);
break;
}
}
}
}
static void revertTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
static void restoreTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
pSliceInfo->current = pSliceInfo->win.skey;
pSliceInfo->prevTsSet = false;
}
@ -582,6 +578,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
setOperatorCompleted(pOperator);
break;
}
@ -591,7 +588,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pSliceInfo->groupId != pBlock->info.id.groupId) {
pSliceInfo->groupId = pBlock->info.id.groupId;
pSliceInfo->pNextGroupRes = pBlock;
break;
restoreTimesliceInfo(pSliceInfo);
goto _group_over;
}
}
@ -618,10 +616,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
}
_group_over:
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
if (pOperator->status == OP_EXEC_DONE) {
break;
}
}
// restore the value
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);