From 6aca4b8a5cef4c71b4b2f30680f3e2c08fb2a424 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 8 Nov 2022 14:50:03 +0800 Subject: [PATCH] enh: interp fill linear ignore null values during calculation --- source/libs/executor/inc/tfill.h | 1 - source/libs/executor/src/timewindowoperator.c | 43 +++++++++++-------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index ed019be767..7e0866f545 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -36,7 +36,6 @@ typedef struct SFillColInfo { typedef struct SFillLinearInfo { SPoint start; SPoint end; - bool hasNull; int16_t type; int32_t bytes; } SFillLinearInfo; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ef99e31f67..0950fcce2f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1999,7 +1999,7 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i); // null data should not be kept since it can not be used to perform interpolation - if (!colDataIsNull_s(pColInfoData, i)) { + if (!colDataIsNull_s(pColInfoData, rowIndex)) { if (pLinearInfo->start.key == INT64_MIN) { pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); @@ -2017,7 +2017,8 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo } -static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock) { +static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, + SSDataBlock* pSrcBlock, int32_t index, bool beforeRange) { int32_t rows = pResBlock->info.rows; blockDataEnsureCapacity(pResBlock, rows + 1); // todo set the correct primary timestamp column @@ -2036,7 +2037,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp } int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - // SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); switch (pSliceInfo->fillType) { case TSDB_FILL_NULL: { colDataAppendNULL(pDst, rows); @@ -2071,17 +2071,17 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp current.val = taosMemoryCalloc(pLinearInfo->bytes, 1); // before interp range, do not fill - if (start.key == INT64_MIN || end.key == INT64_MAX) { + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); + if (colDataIsNull_s(pSrc, index) || (!beforeRange && end.key == INT64_MAX)) { hasInterp = false; break; + } else if (start.key == INT64_MIN || (beforeRange && end.key == INT64_MAX)) { + //hasInterp = true; + return true; } - if (pLinearInfo->hasNull) { - colDataAppendNULL(pDst, rows); - } else { - taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type); - colDataAppend(pDst, rows, (char*)current.val, false); - } + taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type); + colDataAppend(pDst, rows, (char*)current.val, false); taosMemoryFree(current.val); break; @@ -2117,6 +2117,8 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp if (hasInterp) { pResBlock->info.rows += 1; } + + return hasInterp; } static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { @@ -2192,7 +2194,6 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB linearInfo.end.key = INT64_MAX; linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes); linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); - linearInfo.hasNull = false; linearInfo.type = pColInfo->info.type; linearInfo.bytes = pColInfo->info.bytes; taosArrayPush(pInfo->pLinearInfo, &linearInfo); @@ -2306,9 +2307,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, - pInterval->precision); + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { + break; + } else { + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } } if (pSliceInfo->current > pSliceInfo->win.ekey) { @@ -2327,9 +2331,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { doKeepLinearInfo(pSliceInfo, pBlock, i); while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { + break; + } else { + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } } // add current row if timestamp match @@ -2376,7 +2383,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // except for fill(next), fill(linear) while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock); + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, 0, false); pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); }