fix(query): set correct forward step for twa query.

This commit is contained in:
Haojun Liao 2024-04-02 16:10:42 +08:00
parent 94e6f96a75
commit 5a546e37d7
2 changed files with 39 additions and 31 deletions

View File

@ -359,8 +359,8 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i
}
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
STimeWindow* win) {
int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
TSKEY blockEkey, STimeWindow* win) {
int32_t order = pInfo->binfo.inputTsOrder;
TSKEY actualEndKey = tsCols[endRowIndex];
@ -378,7 +378,6 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
return true;
}
int32_t nextRowIndex = endRowIndex + 1;
ASSERT(nextRowIndex >= 0);
TSKEY nextKey = tsCols[nextRowIndex];
@ -517,9 +516,22 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) {
int32_t endRowIndex = startPos + forwardRows - 1;
int32_t nextRowIndex = endRowIndex + 1;
// duplicated ts row does not involve in the interpolation of end value for current time window
int32_t x = endRowIndex;
while(x >= 0) {
if (tsCols[x] == tsCols[x-1]) {
x -= 1;
} else {
endRowIndex = x;
break;
}
}
TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}

View File

@ -533,6 +533,24 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
}
}
static void forwardToNextDiffTsRow(SFuncInputRowIter* pIter, int32_t rowIndex) {
int32_t idx = rowIndex + 1;
while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[rowIndex]) {
++idx;
}
pIter->rowIndex = idx;
}
static void setInputRowInfo(SFuncInputRow* pRow, SFuncInputRowIter* pIter, int32_t rowIndex, bool setPk) {
pRow->ts = pIter->tsList[rowIndex];
pRow->ts = pIter->tsList[rowIndex];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, rowIndex);
pRow->pData = colDataGetData(pIter->pDataCol, rowIndex);
pRow->pPk = setPk? colDataGetData(pIter->pPkCol, rowIndex):NULL;
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = rowIndex;
}
bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
if (pIter->hasPrev) {
if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) {
@ -543,33 +561,19 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
while (pIter->tsList[idx] == pIter->prevBlockTsEnd) {
++idx;
}
pRow->ts = pIter->tsList[idx];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx);
pRow->pData = colDataGetData(pIter->pDataCol, idx);
pRow->pPk = colDataGetData(pIter->pPkCol, idx);
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = idx;
pIter->hasPrev = false;
pIter->rowIndex = idx + 1;
setInputRowInfo(pRow, pIter, idx, true);
forwardToNextDiffTsRow(pIter, idx);
return true;
}
} else {
if (pIter->rowIndex <= pIter->inputEndIndex) {
pRow->ts = pIter->tsList[pIter->rowIndex];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
pRow->pPk = colDataGetData(pIter->pPkCol, pIter->rowIndex);
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = pIter->rowIndex;
setInputRowInfo(pRow, pIter, pIter->rowIndex, true);
TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex];
if (pIter->tsList[pIter->rowIndex] != tsEnd) {
int32_t idx = pIter->rowIndex + 1;
while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[pIter->rowIndex]) {
++idx;
}
pIter->rowIndex = idx;
forwardToNextDiffTsRow(pIter, pIter->rowIndex);
} else {
pIter->rowIndex = pIter->inputEndIndex + 1;
}
@ -585,13 +589,7 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
if (pIter->rowIndex <= pIter->inputEndIndex) {
pRow->ts = pIter->tsList[pIter->rowIndex];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
pRow->pPk = NULL;
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = pIter->rowIndex;
setInputRowInfo(pRow, pIter, pIter->rowIndex, false);
++pIter->rowIndex;
return true;
} else {
@ -5580,8 +5578,6 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);