From a6c876bbc15f082bc144306c8eaee03256fdb03e Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 5 Dec 2023 14:16:01 +0800 Subject: [PATCH] fix: last/last_row crash with dropped column --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 53 ++++++-- source/libs/executor/src/cachescanoperator.c | 6 + tests/system-test/2-query/last_cache_scan.py | 135 +++++++++++++++++++ 3 files changed, 185 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index b6aa791cf0..f668ea5f72 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -22,19 +22,34 @@ #define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) +static void setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) { + char *buf = taosMemoryCalloc(1, pCol->info.bytes); + SFirstLastRes* pRes = (SFirstLastRes*)((char*)buf + VARSTR_HEADER_SIZE); + pRes->bytes = 0; + pRes->hasResult = true; + pRes->isNull = true; + varDataSetLen(buf, pCol->info.bytes - VARSTR_HEADER_SIZE); + colDataSetVal(pCol, row, buf, false); + taosMemoryFree(buf); +} + static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; // bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { - uint64_t ts = 0; - SFirstLastRes* p; - col_id_t colId; + uint64_t ts = TSKEY_MIN; + SFirstLastRes* p = NULL; + col_id_t colId = -1; for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); - int32_t slotId = slotIds[i]; - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); + if (slotIds[i] == -1) { + setFirstLastResColToNull(pColInfoData, numOfRows); + continue; + } + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); colId = pColVal->colVal.cid; p = (SFirstLastRes*)varDataVal(pRes[i]); @@ -63,10 +78,14 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx); if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) { - colDataSetVal(pCol, numOfRows, (const char*)&ts, false); + if (ts == TSKEY_MIN) { + colDataSetNULL(pCol, numOfRows); + } else { + colDataSetVal(pCol, numOfRows, (const char*)&ts, false); + } continue; - } else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) { - if (!p->isNull) { + } else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && (pCol->info.colId == colId || colId == -1)) { + if (p && !p->isNull) { colDataSetVal(pCol, numOfRows, p->buf, false); } else { colDataSetNULL(pCol, numOfRows); @@ -81,6 +100,10 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); int32_t slotId = slotIds[i]; + if (slotId == -1) { + colDataSetNULL(pColInfoData, numOfRows); + continue; + } SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); SColVal* pVal = &pColVal->colVal; @@ -300,7 +323,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } for (int32_t j = 0; j < pr->numOfCols; ++j) { - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE); + int32_t bytes; + if (slotIds[j] == -1) + bytes = 1; + else + bytes = pr->pSchema->columns[slotIds[j]].bytes; + + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); p->ts = INT64_MIN; } @@ -324,6 +353,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 for (int32_t i = 0; i < pr->numOfCols; ++i) { int32_t slotId = slotIds[i]; + if (slotId == -1) { + SLastCol p = {.ts = INT64_MIN, .colVal.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL}; + taosArrayPush(pLastCols, &p); + continue; + } struct STColumn* pCol = &pr->pSchema->columns[slotId]; SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL}; @@ -348,6 +382,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 bool hasNotNullRow = true; int64_t singleTableLastTs = INT64_MAX; for (int32_t k = 0; k < pr->numOfCols; ++k) { + if (slotIds[k] == -1) continue; SLastCol* p = taosArrayGet(pLastCols, k); SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 6d59698855..75fe3c51dd 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -352,6 +352,7 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i); + bool found = false; for (int32_t j = 0; j < pWrapper->nCols; ++j) { /* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { (*pSlotIds)[pColMatch->dstSlotId] = -1; @@ -361,9 +362,14 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask if (pColMatch->colId == pWrapper->pSchema[j].colId) { (*pSlotIds)[i] = j; (*pDstSlotIds)[i] = pColMatch->dstSlotId; + found = true; break; } } + if (!found) { + (*pSlotIds)[i] = -1; + (*pDstSlotIds)[i] = pColMatch->dstSlotId; + } } return TSDB_CODE_SUCCESS; diff --git a/tests/system-test/2-query/last_cache_scan.py b/tests/system-test/2-query/last_cache_scan.py index 0f0936ebab..01795f6eef 100644 --- a/tests/system-test/2-query/last_cache_scan.py +++ b/tests/system-test/2-query/last_cache_scan.py @@ -1,3 +1,4 @@ +from sqlite3 import ProgrammingError import taos import sys import time @@ -319,10 +320,144 @@ class TDTestCase: tdSql.checkData(0, 0, '2018-11-25 19:30:00.000') tdSql.checkData(0, 1, '2018-11-25 19:30:01.000') + def test_cache_scan_with_drop_and_add_column(self): + tdSql.query("select last(c10) from meters") + tdSql.checkData(0, 0, '2018-11-25 19:30:01') + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query("select last(c10) from meters", queryTimes=1) + tdSql.checkData(0, 0, None) + tdSql.query('select last(*) from meters', queryTimes=1) + tdSql.checkData(0, 10, None) + tdSql.query('select last(c10), c10, ts from meters', queryTimes=1) + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + + def test_cache_scan_with_drop_and_add_column2(self): + tdSql.query("select last(c1) from meters") + tdSql.checkData(0, 0, '999') + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c12 int"]) + p.check_returncode() + tdSql.query("select last(c1) from meters", queryTimes=1) + tdSql.checkData(0, 0, None) + tdSql.query('select last(*) from meters', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkData(0, 1, None) + tdSql.query('select last(c1), c1, ts from meters', queryTimes=1) + tdSql.checkRows(1) + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + + try: + tdSql.query('select ts, last(c1), c1, ts, c1 from meters', queryTimes=1) + except Exception as e: + if str(e).count('Invalid column name') == 1: + print('column has been dropped, the cache has been updated: %s' % (str(e))) + return + else: + raise + tdSql.checkRows(1) + tdSql.checkCols(5) + tdSql.checkData(0, 0, None) + tdSql.checkData(0, 1, None) + tdSql.checkData(0, 2, None) + tdSql.checkData(0, 3, None) + tdSql.checkData(0, 4, None) + + try: + tdSql.query('select last(c1), last(c2), last(c3) from meters', queryTimes=1) + except Exception as e: + if str(e).count('Invalid column name') == 1: + print('column has been dropped, the cache has been updated: %s' % (str(e))) + return + else: + raise + tdSql.checkRows(1) + tdSql.checkCols(3) + tdSql.checkData(0, 0, None) + + def test_cache_scan_with_drop_column(self): + tdSql.query('select last(*) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(11) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c9"]) + p.check_returncode() + tdSql.query('select last(*) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(11) + tdSql.checkData(0, 9, None) + + def test_cache_scan_last_row_with_drop_column(self): + tdSql.query('select last_row(*) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(11) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query('select last_row(*) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(11) + tdSql.checkData(0, 10, None) + + def test_cache_scan_last_row_with_drop_column2(self): + tdSql.query('select last_row(c1) from meters') + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query('select last_row(c1) from meters', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkData(0, 0, None) + + def test_cache_scan_last_row_with_partition_by(self): + tdSql.query('select last(c1) from meters partition by t1') + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkRows(5) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query('select last_row(c1) from meters partition by t1', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkRows(5) + tdSql.checkData(0, 0, None) + tdSql.checkData(1, 0, None) + tdSql.checkData(2, 0, None) + tdSql.checkData(3, 0, None) + tdSql.checkData(4, 0, None) + + def test_cache_scan_last_row_with_partition_by_tbname(self): + tdSql.query('select last(c1) from meters partition by tbname', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkRows(10) + p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"]) + p.check_returncode() + tdSql.query('select last_row(c1) from meters partition by tbname', queryTimes=1) + print(str(tdSql.queryResult)) + tdSql.checkCols(1) + tdSql.checkRows(10) + tdSql.checkData(0, 0, None) + tdSql.checkData(1, 0, None) + tdSql.checkData(2, 0, None) + tdSql.checkData(3, 0, None) + tdSql.checkData(4, 0, None) + + + def run(self): self.prepareTestEnv() #time.sleep(99999999) self.test_last_cache_scan() + #self.test_cache_scan_with_drop_and_add_column() + self.test_cache_scan_with_drop_and_add_column2() + #self.test_cache_scan_with_drop_column() + #self.test_cache_scan_last_row_with_drop_column() + #self.test_cache_scan_last_row_with_drop_column2() + #self.test_cache_scan_last_row_with_partition_by() + #self.test_cache_scan_last_row_with_partition_by_tbname() def stop(self): tdSql.close()