diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 4d519f82f1..e1782107f4 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -38,6 +38,8 @@ typedef struct STimeSliceOperatorInfo { struct SFillColInfo* pFillColInfo; // fill column info int64_t prevTs; bool prevTsSet; + uint64_t groupId; + SSDataBlock* pNextGroupRes; } STimeSliceOperatorInfo; static void destroyTimeSliceOperatorInfo(void* param); @@ -456,6 +458,99 @@ static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock return TSDB_CODE_SUCCESS; } +static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, + SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResBlock = pSliceInfo->pRes; + SInterval* pInterval = &pSliceInfo->interval; + + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); + + // check for duplicate timestamps + if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); + } + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + + if (ts == pSliceInfo->current) { + addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); + + doKeepPrevRows(pSliceInfo, pBlock, i); + doKeepLinearInfo(pSliceInfo, pBlock, i); + + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + } else if (ts < pSliceInfo->current) { + // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate + doKeepPrevRows(pSliceInfo, pBlock, i); + doKeepLinearInfo(pSliceInfo, pBlock, i); + + if (i < pBlock->info.rows - 1) { + // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate + doKeepNextRows(pSliceInfo, pBlock, i + 1); + int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); + if (nextTs > pSliceInfo->current) { + while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) && + pSliceInfo->fillType == TSDB_FILL_LINEAR) { + break; + } else { + pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, + pInterval->precision); + } + } + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + } else { + // ignore current row, and do nothing + } + } else { // it is the last row of current block + doKeepPrevRows(pSliceInfo, pBlock, i); + } + } else { // ts > pSliceInfo->current + // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate + doKeepNextRows(pSliceInfo, pBlock, i); + doKeepLinearInfo(pSliceInfo, pBlock, i); + + while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) && + pSliceInfo->fillType == TSDB_FILL_LINEAR) { + break; + } else { + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } + } + + // add current row if timestamp match + if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) { + addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); + + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } + doKeepPrevRows(pSliceInfo, pBlock, i); + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + } + } +} + static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -491,93 +586,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); - - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); - - // check for duplicate timestamps - if (checkDuplicateTimestamps(pSliceInfo, pTsCol, i, pBlock->info.rows)) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP); - } - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - - if (ts == pSliceInfo->current) { - addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); - - doKeepPrevRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } else if (ts < pSliceInfo->current) { - // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate - doKeepPrevRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - if (i < pBlock->info.rows - 1) { - // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate - doKeepNextRows(pSliceInfo, pBlock, i + 1); - int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); - if (nextTs > pSliceInfo->current) { - while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) && - pSliceInfo->fillType == TSDB_FILL_LINEAR) { - break; - } else { - pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, - pInterval->precision); - } - } - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } else { - // ignore current row, and do nothing - } - } else { // it is the last row of current block - doKeepPrevRows(pSliceInfo, pBlock, i); - } - } else { // ts > pSliceInfo->current - // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate - doKeepNextRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) && - pSliceInfo->fillType == TSDB_FILL_LINEAR) { - break; - } else { - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - } - - // add current row if timestamp match - if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) { - addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); - - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - doKeepPrevRows(pSliceInfo, pBlock, i); - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } - } + doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo); } // check if need to interpolate after last datablock @@ -643,6 +652,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode pInfo->current = pInfo->win.skey; pInfo->prevTsSet = false; pInfo->prevTs = 0; + pInfo->groupId = 0; + pInfo->pNextGroupRes = NULL; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info;