Merge pull request #23808 from taosdata/fix/PI-23-3.0

fix: add check in mnode/vnode when alter table col/tag
This commit is contained in:
dapan1121 2023-11-26 17:31:38 +08:00 committed by GitHub
commit d3ac79aeb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 181 additions and 11 deletions

View File

@ -1014,6 +1014,20 @@ static int32_t mndFindSuperTableColumnIndex(const SStbObj *pStb, const char *col
return -1;
}
static bool mndValidateSchema(SSchema *pSchemas, int32_t nSchema, SArray *pFields, int32_t maxLen) {
int32_t rowLen = 0;
for (int32_t i = 0; i < nSchema; ++i) {
rowLen += (pSchemas + i)->bytes;
}
int32_t nField = taosArrayGetSize(pFields);
for (int32_t i = 0; i < nField; ++i) {
rowLen += ((SField *)TARRAY_GET_ELEM(pFields, i))->bytes;
}
return rowLen <= maxLen;
}
static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq *createReq) {
taosRLockLatch(&pStb->lock);
memcpy(pDst, pStb, sizeof(SStbObj));
@ -1269,6 +1283,11 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
return -1;
}
if (!mndValidateSchema(pOld->pTags, pOld->numOfTags, pFields, TSDB_MAX_TAGS_LEN)) {
terrno = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
return -1;
}
pNew->numOfTags = pNew->numOfTags + ntags;
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
@ -1558,6 +1577,16 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj
return -1;
}
uint32_t nLen = 0;
for (int32_t i = 0; i < pOld->numOfTags; ++i) {
nLen += (pOld->pTags[i].colId == colId) ? pField->bytes : pOld->pTags[i].bytes;
}
if (nLen > TSDB_MAX_TAGS_LEN) {
terrno = TSDB_CODE_PAR_INVALID_TAGS_LENGTH;
return -1;
}
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
@ -1592,6 +1621,11 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
return -1;
}
if (!mndValidateSchema(pOld->pColumns, pOld->numOfColumns, pFields, TSDB_MAX_BYTES_PER_ROW)) {
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
return -1;
}
pNew->numOfColumns = pNew->numOfColumns + ncols;
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;

View File

@ -1242,6 +1242,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
if (pAlterTbReq->colName == NULL) {
terrno = TSDB_CODE_INVALID_MSG;
metaError("meta/table: null pAlterTbReq->colName");
return -1;
}
@ -1309,20 +1310,27 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid};
oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols;
int32_t iCol = 0;
int32_t rowLen = -1;
if (pAlterTbReq->action == TSDB_ALTER_TABLE_ADD_COLUMN ||
pAlterTbReq->action == TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES) {
rowLen = 0;
}
int32_t iCol = 0, jCol = 0;
SSchema *qColumn = NULL;
for (;;) {
pColumn = NULL;
qColumn = NULL;
if (iCol >= pSchema->nCols) break;
pColumn = &pSchema->pSchema[iCol];
if (jCol >= pSchema->nCols) break;
qColumn = &pSchema->pSchema[jCol];
if (NULL == pAlterTbReq->colName) {
metaError("meta/table: null pAlterTbReq->colName");
return -1;
if (!pColumn && (strcmp(qColumn->name, pAlterTbReq->colName) == 0)) {
pColumn = qColumn;
iCol = jCol;
if (rowLen < 0) break;
}
if (strcmp(pColumn->name, pAlterTbReq->colName) == 0) break;
iCol++;
rowLen += qColumn->bytes;
++jCol;
}
entry.version = version;
@ -1337,6 +1345,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
if ((terrno = grantCheck(TSDB_GRANT_TIMESERIES)) < 0) {
goto _err;
}
if (rowLen + pAlterTbReq->bytes > TSDB_MAX_BYTES_PER_ROW) {
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
goto _err;
}
pSchema->version++;
pSchema->nCols++;
pNewSchema = taosMemoryMalloc(sizeof(SSchema) * pSchema->nCols);
@ -1378,10 +1390,14 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_COL_NOT_EXISTS;
goto _err;
}
if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes > pAlterTbReq->colModBytes) {
if (!IS_VAR_DATA_TYPE(pColumn->type) || pColumn->bytes >= pAlterTbReq->colModBytes) {
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err;
}
if (rowLen + pAlterTbReq->colModBytes - pColumn->bytes > TSDB_MAX_BYTES_PER_ROW) {
terrno = TSDB_CODE_PAR_INVALID_ROW_LENGTH;
goto _err;
}
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
goto _err;

View File

@ -13,6 +13,7 @@
import random
import string
import threading
from util.log import *
from util.cases import *
from util.sql import *
@ -25,10 +26,24 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
self.fname = __file__ + '.tmp.sql'
self.dbname = 'db1'
self.ntbname = 'ntb'
self.stbname = 'stb'
self.stbnum = 10
self.ntbnum = 10
self.colnum = 52
self.tagnum = 15
self.collen = 320
self.colnum_modify = 40
self.tagnum_modify = 40
self.collen_old_modify = 160
self.collen_new_modify = 455
self.taglen_old_modify = 80
self.taglen_new_modify = 155
self.binary_length = 20 # the length of binary for column_dict
self.nchar_length = 20 # the length of nchar for column_dict
self.threadnum = 2
self.column_dict = {
'ts' : 'timestamp',
'col1': 'tinyint',
@ -183,9 +198,114 @@ class TDTestCase:
tdLog.info(res)
assert(res[1][2] == 39001)
def prepareAlterEnv(self):
tdSql.execute(f'drop database if exists {self.dbname}')
tdSql.execute(f'create database if not exists {self.dbname} vgroups 2')
tdSql.execute(f'use {self.dbname}')
def destroyAlterEnv(self):
tdSql.execute(f'drop database if exists {self.dbname}')
def alterTableTask(self, i):
os.system(f'taos -f {self.fname}.{i};')
def executeAlterTable(self, opt):
threads = []
for i in range(self.threadnum):
thread = threading.Thread(target=self.alterTableTask, args=(i,))
threads.append(thread)
thread.start()
for i in range(self.threadnum):
threads[i].join()
def checkAlterTable(self, opt):
if opt in ["stb_add_col", "stb_add_tag"]:
for i in range(self.stbnum):
tdSql.execute(f'desc {self.stbname}_{i}')
elif opt in ["stb_modify_col", "stb_modify_tag"]:
for i in range(self.stbnum):
tdSql.execute(f'desc {self.stbname}_{i}')
elif opt in ["ntb_add_col", "ntb_modify_col"]:
for i in range(self.ntbnum):
tdSql.execute(f'desc {self.ntbname}_{i}')
def destroyAlterTable(self):
for i in range(self.threadnum):
if os.path.isfile(f'{self.fname}.{i}'):
os.remove(f'{self.fname}.{i}')
def prepareAlterTable(self, opt):
self.destroyAlterTable()
lines = [f'use {self.dbname};\n']
if opt in ["stb_add_col", "stb_add_tag"]:
for i in range(self.stbnum):
tdSql.execute(f'create table if not exists {self.stbname}_{i} (ts timestamp, c_0 NCHAR({self.collen})) tags(t0 nchar({self.collen}));')
for i in range(self.stbnum):
if opt == 'stb_add_col':
for c in range(1, self.colnum):
lines.append(f'alter table {self.stbname}_{i} add column c_{c} NCHAR({self.collen});\n')
else:
for c in range(1, self.tagnum):
lines.append(f'alter table {self.stbname}_{i} add tag t_{c} NCHAR({self.collen});\n')
elif opt in ["stb_modify_col", "stb_modify_tag"]:
for i in range(self.stbnum):
createTbSql = f'CREATE table if not exists {self.stbname}_{i} (ts timestamp'
for j in range(self.colnum_modify):
createTbSql += f',c_{j} NCHAR({self.collen_old_modify})'
createTbSql += f') tags(t_0 NCHAR({self.taglen_old_modify})'
for k in range(1,self.tagnum_modify):
createTbSql += f',t_{k} NCHAR({self.taglen_old_modify})'
createTbSql += f');'
tdLog.info(createTbSql)
tdSql.execute(createTbSql)
for i in range(self.stbnum):
if opt == 'stb_modify_col':
for c in range(self.colnum_modify):
lines.append(f'alter table {self.stbname}_{i} modify column c_{c} NCHAR({self.collen_new_modify});\n')
else:
for c in range(self.tagnum_modify):
lines.append(f'alter table {self.stbname}_{i} modify tag t_{c} NCHAR({self.taglen_new_modify});\n')
elif opt in ['ntb_add_col']:
for i in range(self.ntbnum):
tdSql.execute(f'create table if not exists {self.ntbname}_{i} (ts timestamp, c_0 NCHAR({self.collen}));')
for i in range(self.ntbnum):
for c in range(1, self.colnum):
lines.append(f'alter table {self.ntbname}_{i} add column c_{c} NCHAR({self.collen});\n')
elif opt in ['ntb_modify_col']:
for i in range(self.ntbnum):
createTbSql = f'CREATE table if not exists {self.ntbname}_{i} (ts timestamp'
for j in range(self.colnum_modify):
createTbSql += f',c_{j} NCHAR({self.collen_old_modify})'
createTbSql += f');'
tdLog.info(createTbSql)
tdSql.execute(createTbSql)
for i in range(self.ntbnum):
for c in range(self.colnum_modify):
lines.append(f'alter table {self.ntbname}_{i} modify column c_{c} NCHAR({self.collen_new_modify});\n')
# generate sql file
with open(f'{self.fname}.0', "a") as f:
f.writelines(lines)
# clone sql file in case of race condition
for i in range(1, self.threadnum):
shutil.copy(f'{self.fname}.0', f'{self.fname}.{i}')
def alter_stable_multi_client_check(self):
"""Check alter stable/ntable var type column/tag(PI-23)
"""
alter_table_check_type = ["stb_add_col", "stb_add_tag", "stb_modify_col", "stb_modify_tag", "ntb_add_col", "ntb_modify_col"]
for opt in alter_table_check_type:
self.prepareAlterEnv()
self.prepareAlterTable(opt)
self.executeAlterTable(opt)
self.checkAlterTable(opt)
self.destroyAlterTable()
self.destroyAlterEnv()
def run(self):
self.alter_stable_check()
self.alter_stable_column_varchar_39001()
self.alter_stable_multi_client_check()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)