fix: last/last_row crash with dropped column

This commit is contained in:
wangjiaming0909 2023-12-05 14:16:01 +08:00
parent 48af3a9559
commit a6c876bbc1
3 changed files with 185 additions and 9 deletions

View File

@ -22,19 +22,34 @@
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static void setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) {
char *buf = taosMemoryCalloc(1, pCol->info.bytes);
SFirstLastRes* pRes = (SFirstLastRes*)((char*)buf + VARSTR_HEADER_SIZE);
pRes->bytes = 0;
pRes->hasResult = true;
pRes->isNull = true;
varDataSetLen(buf, pCol->info.bytes - VARSTR_HEADER_SIZE);
colDataSetVal(pCol, row, buf, false);
taosMemoryFree(buf);
}
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows;
// bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
uint64_t ts = 0;
SFirstLastRes* p;
col_id_t colId;
uint64_t ts = TSKEY_MIN;
SFirstLastRes* p = NULL;
col_id_t colId = -1;
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
if (slotIds[i] == -1) {
setFirstLastResColToNull(pColInfoData, numOfRows);
continue;
}
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
colId = pColVal->colVal.cid;
p = (SFirstLastRes*)varDataVal(pRes[i]);
@ -63,10 +78,14 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
if (ts == TSKEY_MIN) {
colDataSetNULL(pCol, numOfRows);
} else {
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
}
continue;
} else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) {
if (!p->isNull) {
} else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && (pCol->info.colId == colId || colId == -1)) {
if (p && !p->isNull) {
colDataSetVal(pCol, numOfRows, p->buf, false);
} else {
colDataSetNULL(pCol, numOfRows);
@ -81,6 +100,10 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
int32_t slotId = slotIds[i];
if (slotId == -1) {
colDataSetNULL(pColInfoData, numOfRows);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
SColVal* pVal = &pColVal->colVal;
@ -300,7 +323,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
for (int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
int32_t bytes;
if (slotIds[j] == -1)
bytes = 1;
else
bytes = pr->pSchema->columns[slotIds[j]].bytes;
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN;
}
@ -324,6 +353,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
for (int32_t i = 0; i < pr->numOfCols; ++i) {
int32_t slotId = slotIds[i];
if (slotId == -1) {
SLastCol p = {.ts = INT64_MIN, .colVal.type = TSDB_DATA_TYPE_BOOL, .colVal.flag = CV_FLAG_NULL};
taosArrayPush(pLastCols, &p);
continue;
}
struct STColumn* pCol = &pr->pSchema->columns[slotId];
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
@ -348,6 +382,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
bool hasNotNullRow = true;
int64_t singleTableLastTs = INT64_MAX;
for (int32_t k = 0; k < pr->numOfCols; ++k) {
if (slotIds[k] == -1) continue;
SLastCol* p = taosArrayGet(pLastCols, k);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);

View File

@ -352,6 +352,7 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
bool found = false;
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
/* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
(*pSlotIds)[pColMatch->dstSlotId] = -1;
@ -361,9 +362,14 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
if (pColMatch->colId == pWrapper->pSchema[j].colId) {
(*pSlotIds)[i] = j;
(*pDstSlotIds)[i] = pColMatch->dstSlotId;
found = true;
break;
}
}
if (!found) {
(*pSlotIds)[i] = -1;
(*pDstSlotIds)[i] = pColMatch->dstSlotId;
}
}
return TSDB_CODE_SUCCESS;

View File

@ -1,3 +1,4 @@
from sqlite3 import ProgrammingError
import taos
import sys
import time
@ -319,10 +320,144 @@ class TDTestCase:
tdSql.checkData(0, 0, '2018-11-25 19:30:00.000')
tdSql.checkData(0, 1, '2018-11-25 19:30:01.000')
def test_cache_scan_with_drop_and_add_column(self):
tdSql.query("select last(c10) from meters")
tdSql.checkData(0, 0, '2018-11-25 19:30:01')
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query("select last(c10) from meters", queryTimes=1)
tdSql.checkData(0, 0, None)
tdSql.query('select last(*) from meters', queryTimes=1)
tdSql.checkData(0, 10, None)
tdSql.query('select last(c10), c10, ts from meters', queryTimes=1)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
def test_cache_scan_with_drop_and_add_column2(self):
tdSql.query("select last(c1) from meters")
tdSql.checkData(0, 0, '999')
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c12 int"])
p.check_returncode()
tdSql.query("select last(c1) from meters", queryTimes=1)
tdSql.checkData(0, 0, None)
tdSql.query('select last(*) from meters', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkData(0, 1, None)
tdSql.query('select last(c1), c1, ts from meters', queryTimes=1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
try:
tdSql.query('select ts, last(c1), c1, ts, c1 from meters', queryTimes=1)
except Exception as e:
if str(e).count('Invalid column name') == 1:
print('column has been dropped, the cache has been updated: %s' % (str(e)))
return
else:
raise
tdSql.checkRows(1)
tdSql.checkCols(5)
tdSql.checkData(0, 0, None)
tdSql.checkData(0, 1, None)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 3, None)
tdSql.checkData(0, 4, None)
try:
tdSql.query('select last(c1), last(c2), last(c3) from meters', queryTimes=1)
except Exception as e:
if str(e).count('Invalid column name') == 1:
print('column has been dropped, the cache has been updated: %s' % (str(e)))
return
else:
raise
tdSql.checkRows(1)
tdSql.checkCols(3)
tdSql.checkData(0, 0, None)
def test_cache_scan_with_drop_column(self):
tdSql.query('select last(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c9"])
p.check_returncode()
tdSql.query('select last(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
tdSql.checkData(0, 9, None)
def test_cache_scan_last_row_with_drop_column(self):
tdSql.query('select last_row(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c10; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(*) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(11)
tdSql.checkData(0, 10, None)
def test_cache_scan_last_row_with_drop_column2(self):
tdSql.query('select last_row(c1) from meters')
print(str(tdSql.queryResult))
tdSql.checkCols(1)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(c1) from meters', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkData(0, 0, None)
def test_cache_scan_last_row_with_partition_by(self):
tdSql.query('select last(c1) from meters partition by t1')
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(5)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(c1) from meters partition by t1', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(5)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
def test_cache_scan_last_row_with_partition_by_tbname(self):
tdSql.query('select last(c1) from meters partition by tbname', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(10)
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c11 int"])
p.check_returncode()
tdSql.query('select last_row(c1) from meters partition by tbname', queryTimes=1)
print(str(tdSql.queryResult))
tdSql.checkCols(1)
tdSql.checkRows(10)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.checkData(2, 0, None)
tdSql.checkData(3, 0, None)
tdSql.checkData(4, 0, None)
def run(self):
self.prepareTestEnv()
#time.sleep(99999999)
self.test_last_cache_scan()
#self.test_cache_scan_with_drop_and_add_column()
self.test_cache_scan_with_drop_and_add_column2()
#self.test_cache_scan_with_drop_column()
#self.test_cache_scan_last_row_with_drop_column()
#self.test_cache_scan_last_row_with_drop_column2()
#self.test_cache_scan_last_row_with_partition_by()
#self.test_cache_scan_last_row_with_partition_by_tbname()
def stop(self):
tdSql.close()