From 26648a909f8c9f2a28f18a6baf997588424fccd1 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 8 Aug 2022 15:20:12 +0800 Subject: [PATCH] fix(query): interp + fill(linear) not working TD-18220 --- source/libs/executor/inc/tfill.h | 9 +- source/libs/executor/src/timewindowoperator.c | 112 +++++++++--------- 2 files changed, 62 insertions(+), 59 deletions(-) diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index a2beedba36..c2de48d0eb 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -34,9 +34,12 @@ typedef struct SFillColInfo { } SFillColInfo; typedef struct SFillLinearInfo { - SPoint start; - SPoint end; - bool fillLastPoint; + SPoint start; + SPoint end; + bool hasNull; + bool fillLastPoint; + int16_t type; + int32_t bytes; } SFillLinearInfo; typedef struct { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index afc5849963..f532ae54d0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2091,18 +2091,28 @@ static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlo int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); + SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i); // null data should not be kept since it can not be used to perform interpolation if (!colDataIsNull_s(pColInfoData, i)) { - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i); + int64_t startKey = *(int64_t*)colDataGetData(pTsCol, rowIndex); + int64_t endKey = *(int64_t*)colDataGetData(pTsCol, rowIndex + 1); + pLinearInfo->start.key = startKey; + pLinearInfo->end.key = endKey; - pkey->isNull = false; - char* val = colDataGetData(pColInfoData, rowIndex); - memcpy(pkey->pData, val, pkey->bytes); + char* val; + val = colDataGetData(pColInfoData, rowIndex); + memcpy(pLinearInfo->start.val, val, pLinearInfo->bytes); + val = colDataGetData(pColInfoData, rowIndex + 1); + memcpy(pLinearInfo->end.val, val, pLinearInfo->bytes); + + pLinearInfo->hasNull = false; + } else { + pLinearInfo->hasNull = true; } } - pSliceInfo->isNextRowSet = true; } static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pBlock, @@ -2133,52 +2143,36 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { float v = 0; GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char*)&v, false); + colDataAppend(pDst, rows, (char *)&v, false); } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { double v = 0; GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char*)&v, false); + colDataAppend(pDst, rows, (char *)&v, false); } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { int64_t v = 0; GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char*)&v, false); + colDataAppend(pDst, rows, (char *)&v, false); } pResBlock->info.rows += 1; break; } case TSDB_FILL_LINEAR: { -#if 0 - if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs - || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { -// goto interp_exit; - } + SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot); - double v1 = -1, v2 = -1; - GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val); - GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val); + SPoint start = pLinearInfo->start; + SPoint end = pLinearInfo->end; + SPoint current = {.key = pSliceInfo->current}; + current.val = taosMemoryCalloc(pLinearInfo->bytes, 1); - SPoint point1 = {.key = ts, .val = &v1}; - SPoint point2 = {.key = nextTs, .val = &v2}; - SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; + if (pLinearInfo->hasNull) { + colDataAppendNULL(pDst, rows); + } else { + taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type); + colDataAppend(pDst, rows, (char *)current.val, false); + } - int32_t srcType = pCtx->inputType; - if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { - setNull(pCtx->pOutput, srcType, pCtx->inputBytes); - } else { - bool exceedMax = false, exceedMin = false; - taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin); - if (exceedMax || exceedMin) { - __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0); - if (func(&pCtx->start.val, &pCtx->end.val) <= 0) { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val); - } else { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val); - } - } - } -#endif - // TODO: pResBlock->info.rows += 1; + pResBlock->info.rows += 1; break; } case TSDB_FILL_PREV: { @@ -2278,13 +2272,16 @@ static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pB for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - SFillLinearInfo pLinearInfo = {0}; - pLinearInfo.start.key = INT64_MIN; - pLinearInfo.end.key = INT64_MAX; - pLinearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes); - pLinearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); - pLinearInfo.fillLastPoint = false; - taosArrayPush(pInfo->pLinearInfo, &pLinearInfo); + SFillLinearInfo linearInfo = {0}; + linearInfo.start.key = INT64_MIN; + linearInfo.end.key = INT64_MAX; + linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes); + linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); + linearInfo.hasNull = false; + linearInfo.fillLastPoint = false; + linearInfo.type = pColInfo->info.type; + linearInfo.bytes = pColInfo->info.bytes; + taosArrayPush(pInfo->pLinearInfo, &linearInfo); } return TSDB_CODE_SUCCESS; @@ -2374,6 +2371,9 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // if its the first point in data block, also fill values between previous(if there's any) and this point; // if its the last point in data block, no need to fill, but reserve this point as the start value for next data block. if (pSliceInfo->fillType == TSDB_FILL_LINEAR) { + doKeepLinearInfo(pSliceInfo, pBlock, i); + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (i < pBlock->info.rows - 1) { int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { @@ -2395,24 +2395,24 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } } else { // it is the last row of current block } - } + } else { // non-linear interpolation + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pSliceInfo->current > pSliceInfo->win.ekey) { + doSetOperatorCompleted(pOperator); + break; + } - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); - break; - } - - if (pResBlock->info.rows >= pResBlock->info.capacity) { - break; + if (pResBlock->info.rows >= pResBlock->info.capacity) { + break; + } } } else if (ts < pSliceInfo->current) { - // in case interpolation window starts and ends between two datapoints, fill(prev) need to interpolate + // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate doKeepPrevRows(pSliceInfo, pBlock, i); if (i < pBlock->info.rows - 1) { - // in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate + // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate doKeepNextRows(pSliceInfo, pBlock, i + 1); int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { @@ -2436,7 +2436,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { doKeepPrevRows(pSliceInfo, pBlock, i); } } else { // ts > pSliceInfo->current - // in case interpolation window starts and ends between two datapoints, fill(next) need to interpolate + // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate doKeepNextRows(pSliceInfo, pBlock, i); while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {