refactor timeslice operator

This commit is contained in:
Ganlin Zhao 2023-04-26 11:04:13 +08:00
parent d4c783b590
commit c56bb64f1c
1 changed files with 98 additions and 87 deletions

View File

@ -38,6 +38,8 @@ typedef struct STimeSliceOperatorInfo {
struct SFillColInfo* pFillColInfo; // fill column info struct SFillColInfo* pFillColInfo; // fill column info
int64_t prevTs; int64_t prevTs;
bool prevTsSet; bool prevTsSet;
uint64_t groupId;
SSDataBlock* pNextGroupRes;
} STimeSliceOperatorInfo; } STimeSliceOperatorInfo;
static void destroyTimeSliceOperatorInfo(void* param); static void destroyTimeSliceOperatorInfo(void* param);
@ -456,41 +458,10 @@ static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
if (pOperator->status == OP_EXEC_DONE) { SExecTaskInfo* pTaskInfo) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->pRes; SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval; SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0];
blockDataCleanup(pResBlock);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
}
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
for (int32_t i = 0; i < pBlock->info.rows; ++i) { for (int32_t i = 0; i < pBlock->info.rows; ++i) {
@ -578,6 +549,44 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
} }
} }
} }
}
static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval;
SOperatorInfo* downstream = pOperator->pDownstream[0];
blockDataCleanup(pResBlock);
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
break;
}
if (pSliceInfo->scalarSup.pExprInfo != NULL) {
SExprSupp* pExprSup = &pSliceInfo->scalarSup;
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
} }
// check if need to interpolate after last datablock // check if need to interpolate after last datablock
@ -643,6 +652,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo->current = pInfo->win.skey; pInfo->current = pInfo->win.skey;
pInfo->prevTsSet = false; pInfo->prevTsSet = false;
pInfo->prevTs = 0; pInfo->prevTs = 0;
pInfo->groupId = 0;
pInfo->pNextGroupRes = NULL;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;