refactor timeslice operator

This commit is contained in:
Ganlin Zhao 2023-04-26 11:04:13 +08:00
parent c56bb64f1c
commit fc6ca69afa
1 changed files with 44 additions and 21 deletions

View File

@ -551,6 +551,11 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
}
}
static void revertTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
pSliceInfo->current = pSliceInfo->win.skey;
pSliceInfo->prevTsSet = false;
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
@ -569,34 +574,52 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
blockDataCleanup(pResBlock);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
if (pSliceInfo->pNextGroupRes != NULL) {
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
pSliceInfo->pNextGroupRes = NULL;
}
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
}
if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
pSliceInfo->groupId = pBlock->info.id.groupId;
} else {
if (pSliceInfo->groupId != pBlock->info.id.groupId) {
pSliceInfo->groupId = pBlock->info.id.groupId;
pSliceInfo->pNextGroupRes = pBlock;
break;
}
}
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
// check if need to interpolate after last datablock
// except for fill(next), fill(linear)
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
}
// check if need to interpolate after last datablock
// except for fill(next), fill(linear)
while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT &&
pSliceInfo->fillType != TSDB_FILL_LINEAR) {
genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false);
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
}
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);