diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 51bfe716c8..f24b581ca2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -359,8 +359,8 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i } static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, - SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, - STimeWindow* win) { + int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols, + TSKEY blockEkey, STimeWindow* win) { int32_t order = pInfo->binfo.inputTsOrder; TSKEY actualEndKey = tsCols[endRowIndex]; @@ -378,7 +378,6 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx return true; } - int32_t nextRowIndex = endRowIndex + 1; ASSERT(nextRowIndex >= 0); TSKEY nextKey = tsCols[nextRowIndex]; @@ -517,9 +516,22 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP); if (!done) { int32_t endRowIndex = startPos + forwardRows - 1; + int32_t nextRowIndex = endRowIndex + 1; + + // duplicated ts row does not involve in the interpolation of end value for current time window + int32_t x = endRowIndex; + while(x >= 0) { + if (tsCols[x] == tsCols[x-1]) { + + x -= 1; + } else { + endRowIndex = x; + break; + } + } TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; - bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); + bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 70e906c6ec..f779c1fa25 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -533,6 +533,24 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { } } +static void forwardToNextDiffTsRow(SFuncInputRowIter* pIter, int32_t rowIndex) { + int32_t idx = rowIndex + 1; + while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[rowIndex]) { + ++idx; + } + pIter->rowIndex = idx; +} + +static void setInputRowInfo(SFuncInputRow* pRow, SFuncInputRowIter* pIter, int32_t rowIndex, bool setPk) { + pRow->ts = pIter->tsList[rowIndex]; + pRow->ts = pIter->tsList[rowIndex]; + pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, rowIndex); + pRow->pData = colDataGetData(pIter->pDataCol, rowIndex); + pRow->pPk = setPk? colDataGetData(pIter->pPkCol, rowIndex):NULL; + pRow->block = pIter->pSrcBlock; + pRow->rowIndex = rowIndex; +} + bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { if (pIter->hasPrev) { if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { @@ -543,33 +561,19 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { while (pIter->tsList[idx] == pIter->prevBlockTsEnd) { ++idx; } - pRow->ts = pIter->tsList[idx]; - pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx); - pRow->pData = colDataGetData(pIter->pDataCol, idx); - pRow->pPk = colDataGetData(pIter->pPkCol, idx); - pRow->block = pIter->pSrcBlock; - pRow->rowIndex = idx; pIter->hasPrev = false; - pIter->rowIndex = idx + 1; + setInputRowInfo(pRow, pIter, idx, true); + forwardToNextDiffTsRow(pIter, idx); return true; } } else { if (pIter->rowIndex <= pIter->inputEndIndex) { - pRow->ts = pIter->tsList[pIter->rowIndex]; - pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex); - pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex); - pRow->pPk = colDataGetData(pIter->pPkCol, pIter->rowIndex); - pRow->block = pIter->pSrcBlock; - pRow->rowIndex = pIter->rowIndex; + setInputRowInfo(pRow, pIter, pIter->rowIndex, true); TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex]; if (pIter->tsList[pIter->rowIndex] != tsEnd) { - int32_t idx = pIter->rowIndex + 1; - while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[pIter->rowIndex]) { - ++idx; - } - pIter->rowIndex = idx; + forwardToNextDiffTsRow(pIter, pIter->rowIndex); } else { pIter->rowIndex = pIter->inputEndIndex + 1; } @@ -585,13 +589,7 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { if (pIter->rowIndex <= pIter->inputEndIndex) { - pRow->ts = pIter->tsList[pIter->rowIndex]; - pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex); - pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex); - pRow->pPk = NULL; - pRow->block = pIter->pSrcBlock; - pRow->rowIndex = pIter->rowIndex; - + setInputRowInfo(pRow, pIter, pIter->rowIndex, false); ++pIter->rowIndex; return true; } else { @@ -5582,8 +5580,6 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);