Merge pull request #21670 from taosdata/feat/TD-22193
feat: add pipeline processing for timeslice operator
This commit is contained in:
commit
7b060d693e
|
@ -44,6 +44,8 @@ typedef struct STimeSliceOperatorInfo {
|
|||
uint64_t groupId;
|
||||
SGroupKeys* pPrevGroupKey;
|
||||
SSDataBlock* pNextGroupRes;
|
||||
SSDataBlock* pRemainRes; // save block unfinished processing
|
||||
int32_t remainIndex; // the remaining index in the block to be processed
|
||||
} STimeSliceOperatorInfo;
|
||||
|
||||
static void destroyTimeSliceOperatorInfo(void* param);
|
||||
|
@ -644,13 +646,47 @@ static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, int32_t threshold) {
|
||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||
if (pResBlock->info.rows > threshold) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) {
|
||||
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t curIndex) {
|
||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
||||
if (curIndex < pBlock->info.rows - 1) {
|
||||
pSliceInfo->pRemainRes = pBlock;
|
||||
pSliceInfo->remainIndex = curIndex + 1;
|
||||
return;
|
||||
}
|
||||
|
||||
// all data in remaining block processed
|
||||
pSliceInfo->pRemainRes = NULL;
|
||||
|
||||
}
|
||||
|
||||
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
|
||||
SExecTaskInfo* pTaskInfo, bool ignoreNull) {
|
||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||
SInterval* pInterval = &pSliceInfo->interval;
|
||||
|
||||
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
|
||||
int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex;
|
||||
for (; i < pBlock->info.rows; ++i) {
|
||||
int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
|
||||
|
||||
// check for duplicate timestamps
|
||||
|
@ -662,10 +698,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
|||
continue;
|
||||
}
|
||||
|
||||
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (ts == pSliceInfo->current) {
|
||||
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
|
||||
|
||||
|
@ -674,9 +706,14 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
|||
|
||||
pSliceInfo->current =
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
||||
|
||||
if (checkWindowBoundReached(pSliceInfo)) {
|
||||
break;
|
||||
}
|
||||
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||
saveBlockStatus(pSliceInfo, pBlock, i);
|
||||
return;
|
||||
}
|
||||
} else if (ts < pSliceInfo->current) {
|
||||
// in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
|
||||
doKeepPrevRows(pSliceInfo, pBlock, i);
|
||||
|
@ -697,9 +734,13 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
|||
}
|
||||
}
|
||||
|
||||
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
||||
if (checkWindowBoundReached(pSliceInfo)) {
|
||||
break;
|
||||
}
|
||||
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||
saveBlockStatus(pSliceInfo, pBlock, i);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// ignore current row, and do nothing
|
||||
}
|
||||
|
@ -730,11 +771,20 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
|
|||
}
|
||||
doKeepPrevRows(pSliceInfo, pBlock, i);
|
||||
|
||||
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
||||
if (checkWindowBoundReached(pSliceInfo)) {
|
||||
break;
|
||||
}
|
||||
if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||
saveBlockStatus(pSliceInfo, pBlock, i);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if reached here, meaning block processing finished naturally,
|
||||
// or interpolation reach window upper bound
|
||||
pSliceInfo->pRemainRes = NULL;
|
||||
|
||||
}
|
||||
|
||||
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
|
||||
|
@ -781,34 +831,63 @@ static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
|
|||
resetKeeperInfo(pSliceInfo);
|
||||
}
|
||||
|
||||
static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
bool ignoreNull = getIgoreNullRes(pSup);
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
|
||||
int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
|
||||
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
|
||||
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||
}
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
||||
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
|
||||
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
|
||||
}
|
||||
|
||||
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
|
||||
SSDataBlock* pResBlock = pSliceInfo->pRes;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
bool ignoreNull = getIgoreNullRes(pSup);
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
SInterval* pInterval = &pSliceInfo->interval;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
|
||||
while (1) {
|
||||
if (pSliceInfo->pNextGroupRes != NULL) {
|
||||
setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
|
||||
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo, ignoreNull);
|
||||
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes);
|
||||
doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes);
|
||||
if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
if (pSliceInfo->pRemainRes == NULL) {
|
||||
pSliceInfo->pNextGroupRes = NULL;
|
||||
}
|
||||
if (pResBlock->info.rows != 0) {
|
||||
goto _finished;
|
||||
} else {
|
||||
// after fillter if result block has 0 rows, go back to
|
||||
// process pNextGroupRes again for unfinished data
|
||||
continue;
|
||||
}
|
||||
}
|
||||
pSliceInfo->pNextGroupRes = NULL;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
setOperatorCompleted(pOperator);
|
||||
break;
|
||||
|
@ -824,21 +903,15 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
|
||||
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
|
||||
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
|
||||
doHandleTimeslice(pOperator, pBlock);
|
||||
if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
|
||||
doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
if (pResBlock->info.rows != 0) {
|
||||
goto _finished;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
|
||||
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
|
||||
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
|
||||
}
|
||||
// post work for a specific group
|
||||
|
||||
// check if need to interpolate after last datablock
|
||||
// except for fill(next), fill(linear)
|
||||
|
@ -851,11 +924,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
|||
|
||||
// restore initial value for next group
|
||||
resetTimesliceInfo(pSliceInfo);
|
||||
if (pResBlock->info.rows >= 4096) {
|
||||
if (pResBlock->info.rows != 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
_finished:
|
||||
// restore the value
|
||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
if (pResBlock->info.rows == 0) {
|
||||
|
@ -911,6 +985,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
|||
pInfo->groupId = 0;
|
||||
pInfo->pPrevGroupKey = NULL;
|
||||
pInfo->pNextGroupRes = NULL;
|
||||
pInfo->pRemainRes = NULL;
|
||||
pInfo->remainIndex = 0;
|
||||
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;
|
||||
|
|
Loading…
Reference in New Issue