Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/TS-4243-3.0

This commit is contained in:
kailixu 2024-04-02 17:33:19 +08:00
commit f6dc3205ee
2 changed files with 39 additions and 31 deletions

View File

@ -359,8 +359,8 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i
} }
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
STimeWindow* win) { TSKEY blockEkey, STimeWindow* win) {
int32_t order = pInfo->binfo.inputTsOrder; int32_t order = pInfo->binfo.inputTsOrder;
TSKEY actualEndKey = tsCols[endRowIndex]; TSKEY actualEndKey = tsCols[endRowIndex];
@ -378,7 +378,6 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
return true; return true;
} }
int32_t nextRowIndex = endRowIndex + 1;
ASSERT(nextRowIndex >= 0); ASSERT(nextRowIndex >= 0);
TSKEY nextKey = tsCols[nextRowIndex]; TSKEY nextKey = tsCols[nextRowIndex];
@ -517,9 +516,22 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP); done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) { if (!done) {
int32_t endRowIndex = startPos + forwardRows - 1; int32_t endRowIndex = startPos + forwardRows - 1;
int32_t nextRowIndex = endRowIndex + 1;
// duplicated ts row does not involve in the interpolation of end value for current time window
int32_t x = endRowIndex;
while(x >= 0) {
if (tsCols[x] == tsCols[x-1]) {
x -= 1;
} else {
endRowIndex = x;
break;
}
}
TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
} }

View File

@ -533,6 +533,24 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
} }
} }
static void forwardToNextDiffTsRow(SFuncInputRowIter* pIter, int32_t rowIndex) {
int32_t idx = rowIndex + 1;
while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[rowIndex]) {
++idx;
}
pIter->rowIndex = idx;
}
static void setInputRowInfo(SFuncInputRow* pRow, SFuncInputRowIter* pIter, int32_t rowIndex, bool setPk) {
pRow->ts = pIter->tsList[rowIndex];
pRow->ts = pIter->tsList[rowIndex];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, rowIndex);
pRow->pData = colDataGetData(pIter->pDataCol, rowIndex);
pRow->pPk = setPk? colDataGetData(pIter->pPkCol, rowIndex):NULL;
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = rowIndex;
}
bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
if (pIter->hasPrev) { if (pIter->hasPrev) {
if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) {
@ -543,33 +561,19 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
while (pIter->tsList[idx] == pIter->prevBlockTsEnd) { while (pIter->tsList[idx] == pIter->prevBlockTsEnd) {
++idx; ++idx;
} }
pRow->ts = pIter->tsList[idx];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx);
pRow->pData = colDataGetData(pIter->pDataCol, idx);
pRow->pPk = colDataGetData(pIter->pPkCol, idx);
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = idx;
pIter->hasPrev = false; pIter->hasPrev = false;
pIter->rowIndex = idx + 1; setInputRowInfo(pRow, pIter, idx, true);
forwardToNextDiffTsRow(pIter, idx);
return true; return true;
} }
} else { } else {
if (pIter->rowIndex <= pIter->inputEndIndex) { if (pIter->rowIndex <= pIter->inputEndIndex) {
pRow->ts = pIter->tsList[pIter->rowIndex]; setInputRowInfo(pRow, pIter, pIter->rowIndex, true);
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
pRow->pPk = colDataGetData(pIter->pPkCol, pIter->rowIndex);
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = pIter->rowIndex;
TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex]; TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex];
if (pIter->tsList[pIter->rowIndex] != tsEnd) { if (pIter->tsList[pIter->rowIndex] != tsEnd) {
int32_t idx = pIter->rowIndex + 1; forwardToNextDiffTsRow(pIter, pIter->rowIndex);
while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[pIter->rowIndex]) {
++idx;
}
pIter->rowIndex = idx;
} else { } else {
pIter->rowIndex = pIter->inputEndIndex + 1; pIter->rowIndex = pIter->inputEndIndex + 1;
} }
@ -585,13 +589,7 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
if (pIter->rowIndex <= pIter->inputEndIndex) { if (pIter->rowIndex <= pIter->inputEndIndex) {
pRow->ts = pIter->tsList[pIter->rowIndex]; setInputRowInfo(pRow, pIter, pIter->rowIndex, false);
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
pRow->pPk = NULL;
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = pIter->rowIndex;
++pIter->rowIndex; ++pIter->rowIndex;
return true; return true;
} else { } else {
@ -5582,8 +5580,6 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);