feat: add pipeline processing for timeslice operator

This commit is contained in:
Ganlin Zhao 2023-06-08 16:32:35 +08:00
parent 39b589a5a8
commit b4c2085ddc
1 changed files with 80 additions and 32 deletions

View File

@ -44,6 +44,7 @@ typedef struct STimeSliceOperatorInfo {
uint64_t groupId;
SGroupKeys* pPrevGroupKey;
SSDataBlock* pNextGroupRes;
SSDataBlock* pRemainRes; // save block unfinished processing
} STimeSliceOperatorInfo;
static void destroyTimeSliceOperatorInfo(void* param);
@ -641,6 +642,25 @@ static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t threshold) {
SSDataBlock* pResBlock = pSliceInfo->pRes;
if (pResBlock->info.rows > threshold) {
pSliceInfo->pRemainRes = pBlock;
return true;
}
return false;
}
static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) {
if (pSliceInfo->current > pSliceInfo->win.ekey) {
return true;
}
return false;
}
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo, bool ignoreNull) {
SSDataBlock* pResBlock = pSliceInfo->pRes;
@ -658,10 +678,12 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
continue;
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
if (checkWindowBoundReached(pSliceInfo)) {
break;
}
if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) {
return;
}
if (ts == pSliceInfo->current) {
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
@ -671,9 +693,13 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
pSliceInfo->current =
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
if (checkWindowBoundReached(pSliceInfo)) {
break;
}
if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) {
return;
}
} else if (ts < pSliceInfo->current) {
// in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows(pSliceInfo, pBlock, i);
@ -694,9 +720,12 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
}
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
if (checkWindowBoundReached(pSliceInfo)) {
break;
}
if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) {
return;
}
} else {
// ignore current row, and do nothing
}
@ -727,11 +756,19 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
}
doKeepPrevRows(pSliceInfo, pBlock, i);
if (pSliceInfo->current > pSliceInfo->win.ekey) {
if (checkWindowBoundReached(pSliceInfo)) {
break;
}
if (checkThresholdReached(pSliceInfo, pBlock, pOperator->resultInfo.threshold)) {
return;
}
}
}
// if reached here, meaning block processing finished naturally,
// or interpolation reach window upper bound
pSliceInfo->pRemainRes = NULL;
}
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
@ -778,34 +815,54 @@ static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
resetKeeperInfo(pSliceInfo);
}
static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp;
bool ignoreNull = getIgoreNullRes(pSup);
int32_t order = TSDB_ORDER_ASC;
int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp;
bool ignoreNull = getIgoreNullRes(pSup);
int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0];
blockDataCleanup(pResBlock);
while (1) {
if (pSliceInfo->pNextGroupRes != NULL) {
setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo, ignoreNull);
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes);
doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes);
if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) {
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
goto _finished;
}
pSliceInfo->pNextGroupRes = NULL;
}
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
setOperatorCompleted(pOperator);
break;
@ -821,21 +878,13 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
}
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
doHandleTimeslice(pOperator, pBlock);
if (checkThresholdReached(pSliceInfo, pSliceInfo->pRemainRes, pOperator->resultInfo.threshold)) {
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
goto _finished;
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
}
// handling post work for a specific group
// check if need to interpolate after last datablock
// except for fill(next), fill(linear)
@ -848,11 +897,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
// restore initial value for next group
resetTimesliceInfo(pSliceInfo);
if (pResBlock->info.rows >= 4096) {
break;
}
}
_finished:
// restore the value
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
if (pResBlock->info.rows == 0) {
@ -908,6 +955,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->groupId = 0;
pInfo->pPrevGroupKey = NULL;
pInfo->pNextGroupRes = NULL;
pInfo->pRemainRes = NULL;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;