fix fill order by returned wrong values

This commit is contained in:
wangjiaming0909 2024-08-12 15:27:15 +08:00
parent 882cfb7deb
commit 19952995af
4 changed files with 62 additions and 5 deletions

View File

@ -254,6 +254,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
(pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
setOperatorCompleted(pOperator);
return NULL;
} else if (pInfo->totalInputRows == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
reviseFillStartAndEndKey(pInfo, order);
}
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);

View File

@ -207,7 +207,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
setNotFillColumn(pFillInfo, pDstCol, index, i);
}
} else {
SRowVal* pRVal = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->prev : &pFillInfo->next;
SRowVal* pRVal = &pFillInfo->prev;
SGroupKeys* pKey = taosArrayGet(pRVal->pRowVal, i);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
colDataSetNULL(pDstCol, index);
@ -395,8 +395,8 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
ASSERT(pFillInfo->currentKey == ts);
int32_t index = pBlock->info.rows;
int32_t nextRowIndex = pFillInfo->index + 1;
if (pFillInfo->type == TSDB_FILL_NEXT) {
int32_t nextRowIndex = pFillInfo->index + 1;
if ((pFillInfo->index + 1) < pFillInfo->numOfRows) {
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, false);
} else {
@ -404,6 +404,11 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
}
}
if (pFillInfo->type == TSDB_FILL_PREV) {
if (nextRowIndex + 1 >= pFillInfo->numOfRows && !FILL_IS_ASC_FILL(pFillInfo)) {
copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
}
}
// copy rows to dst buffer
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
@ -418,7 +423,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
if (!colDataIsNull_s(pSrc, pFillInfo->index)) {
code = colDataSetVal(pDst, index, src, false);
QUERY_CHECK_CODE(code, lino, _end);
SRowVal* pRVal = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->prev : &pFillInfo->next;
SRowVal* pRVal = &pFillInfo->prev;
saveColData(pRVal->pRowVal, i, src, false);
if (pFillInfo->srcTsSlotId == dstSlotId) {
pRVal->key = *(int64_t*)src;
@ -439,7 +444,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t
code = colDataSetVal(pDst, index, src, isNull);
QUERY_CHECK_CODE(code, lino, _end);
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
SArray* p = pFillInfo->prev.pRowVal;
saveColData(p, i, src, isNull); // todo:
} else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
colDataSetNULL(pDst, index);

View File

@ -126,6 +126,17 @@ class TDTestCase:
def test_fill_range(self):
os.system('taosBenchmark -t 10 -n 10000 -v 8 -S 32000 -y')
self.schedule_fill_test_tasks()
sql = "select first(_wstart), last(_wstart) from (select _wstart, count(*) from test.meters where ts >= '2019-09-19 23:54:00.000' and ts < '2019-09-20 01:00:00.000' interval(10s) sliding(10s) fill(VALUE_F, 122) order by _wstart) t"
tdSql.query(sql, queryTimes=1)
rows = tdSql.getRows()
first_ts = tdSql.queryResult[0][0]
last_ts = tdSql.queryResult[0][1]
sql = "select first(_wstart), last(_wstart) from (select _wstart, count(*) from test.meters where ts >= '2019-09-19 23:54:00.000' and ts < '2019-09-20 01:00:00.000' interval(10s) sliding(10s) fill(VALUE_F, 122) order by _wstart desc) t"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(rows)
tdSql.checkData(0, 0, first_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.execute('drop database test')
def run(self):

View File

@ -165,7 +165,7 @@ class TDTestCase:
sql = "select _wstart, count(*) from meters where ts >= '2018-09-20 00:00:00.000' and ts < '2018-09-20 01:00:00.000' interval(5m) fill(prev) order by _wstart desc;"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(12)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 1, 10)
tdSql.checkData(1, 1, 10)
tdSql.checkData(2, 1, 10)
tdSql.checkData(3, 1, 10)
@ -198,6 +198,45 @@ class TDTestCase:
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(60)
sql = "select _wstart, count(*) from meters where ts >= '2018-09-19 23:54:00.000' and ts < '2018-09-20 01:00:00.000' interval(5m) fill(next) order by _wstart asc;"
tdSql.query(sql, queryTimes=1)
for i in range(0, 13):
tdSql.checkData(i, 1, 10)
tdSql.checkData(13, 1, None)
sql = "select _wstart, count(*) from meters where ts >= '2018-09-19 23:54:00.000' and ts < '2018-09-20 01:00:00.000' interval(5m) fill(next) order by _wstart desc;"
tdSql.query(sql, queryTimes=1)
tdSql.checkData(0, 1, None)
for i in range(1, 14):
tdSql.checkData(i, 1, 10)
sql = "select _wstart, count(*) from meters where ts >= '2018-09-19 23:54:00.000' and ts < '2018-09-20 01:00:00.000' interval(5m) fill(prev) order by _wstart asc;"
tdSql.query(sql, queryTimes=1)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, None)
for i in range(2, 14):
tdSql.checkData(i, 1, 10)
sql = "select _wstart, count(*) from meters where ts >= '2018-09-19 23:54:00.000' and ts < '2018-09-20 01:00:00.000' interval(5m) fill(prev) order by _wstart desc;"
tdSql.query(sql, queryTimes=1)
for i in range(0, 12):
tdSql.checkData(i, 1, 10)
tdSql.checkData(12, 1, None)
tdSql.checkData(13, 1, None)
sql = "select _wstart, count(*) from meters where ts >= '2018-09-19 23:54:00.000' and ts < '2018-09-20 01:00:00.000' interval(5m) fill(linear) order by _wstart asc;"
tdSql.query(sql, queryTimes=1)
tdSql.checkData(0, 1, None)
tdSql.checkData(1, 1, None)
for i in range(2, 13):
tdSql.checkData(i, 1, 10)
tdSql.checkData(13, 1, None)
sql = "select _wstart, count(*) from meters where ts >= '2018-09-19 23:54:00.000' and ts < '2018-09-20 01:00:00.000' interval(5m) fill(linear) order by _wstart desc;"
tdSql.query(sql, queryTimes=1)
tdSql.checkData(0, 1, None)
for i in range(1, 12):
tdSql.checkData(i, 1, 10)
tdSql.checkData(12, 1, None)
tdSql.checkData(13, 1, None)
def run(self):
self.prepareTestEnv()
self.test_partition_by_with_interval_fill_prev_new_group_fill_error()