adjust parseOneStbRow; add case for stable

This commit is contained in:
Bob Liu 2023-11-29 22:47:31 +08:00
parent 084a0b1a65
commit 3069069e0f
2 changed files with 92 additions and 41 deletions

View File

@ -1584,7 +1584,7 @@ typedef struct SStbRowsDataContext {
bool isJsonTag;
} SStbRowsDataContext;
typedef union SRowsDataContext{
typedef struct SRowsDataContext{
STableDataCxt* pTableDataCxt;
SStbRowsDataContext* pStbRowsCxt;
} SRowsDataContext;
@ -1803,8 +1803,9 @@ static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
}
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
SRowsDataContext* pRowsDataCxt, bool* pGotRow, SToken* pToken) {
bool bFirstTable = false;
SStbRowsDataContext* pStbRowsCxt = pRowsDataCxt->pStbRowsCxt;
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable);
if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
return code;
@ -1814,20 +1815,16 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
}
STableDataCxt* pTableDataCxt = NULL;
code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pTableDataCxt, false, true);
initTableColSubmitData(pTableDataCxt);
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pRowsDataCxt->pTableDataCxt, false, true);
initTableColSubmitData(pRowsDataCxt->pTableDataCxt);
if (code == TSDB_CODE_SUCCESS) {
STableDataCxt* pTableDataCxt = pRowsDataCxt->pTableDataCxt;
SRow** pRow = taosArrayReserve(pTableDataCxt->pData->aRowP, 1);
code = tRowBuild(pStbRowsCxt->aColVals, pTableDataCxt->pSchema, pRow);
if (TSDB_CODE_SUCCESS == code) {
insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow));
}
void* pData = pTableDataCxt;
taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData,
POINTER_BYTES);
}
if (code == TSDB_CODE_SUCCESS) {
@ -1924,7 +1921,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
if (!pStmt->stbSyntax) {
code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken);
} else {
code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken);
code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, &rowsDataCxt, &gotRow, pToken);
}
}
@ -1988,7 +1985,14 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
if (!pStmt->stbSyntax) {
code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
} else {
code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token);
rowsDataCxt.pTableDataCxt = NULL;
code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, &rowsDataCxt, &gotRow, &token);
if (code == TSDB_CODE_SUCCESS) {
SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt;
void* pData = rowsDataCxt.pTableDataCxt;
taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData,
POINTER_BYTES);
}
}
if (code && firstLine) {
firstLine = false;

View File

@ -13,22 +13,26 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
self.testcasePath = os.path.split(__file__)[0]
self.testcaseFilename = os.path.split(__file__)[-1]
self.testcasefilename = os.path.split(__file__)[-1]
self.ts = 1700638570000 # 2023-11-22T07:36:10.000Z
self.db = 'db1'
self.tb1 = 'd001'
self.tb2 = 'd002'
self.tag1 = 'using meters(groupId) tags(1)'
self.tag2 = 'using meters(groupId) tags(2)'
self.stable0 = "meters"
self.stable1 = "stb_1"
self.stable2 = "stb_null"
self.tag1 = f'using {self.stable0}(groupId) tags(1)'
self.tag2 = f'using {self.stable0}(groupId) tags(2)'
self.file1 = f"{self.testcasePath}/b.csv"
self.file2 = f"{self.testcasePath}/c.csv"
os.system("rm -rf %s/b.csv" %self.testcasePath)
#os.system("rm -rf %s/b.csv" %self.testcasePath)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), logSql)
def check_count(self, rows, records):
tdSql.execute(f"use d1;")
tdSql.query(f"select tbname,count(*) from meters group by tbname order by tbname;")
tdSql.execute(f"use {self.db};")
tdSql.query(f"select tbname,count(*) from {self.stable0} group by tbname order by tbname;")
tdSql.checkRows(rows)
for i in range(rows):
tdSql.checkData(i, 1, records[i])
@ -36,21 +40,23 @@ class TDTestCase:
def reset_tb(self):
# create database and tables
# os.system("taos -s 'drop database if exists d1;'")
# os.system("taos -s 'create database d1;use d1;CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);'")
# os.system(f"taos -s 'use d1;Create table d2001 using meters(groupId) tags(5);'")
# res = os.system(f"taos -s 'use d1;Create table d2002 using meters(groupId) tags(6);'")
# os.system("taos -s 'create database d1;use d1;create stable meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);'")
# os.system(f"taos -s 'use d1;create table d2001 using meters(groupId) tags(5);'")
# res = os.system(f"taos -s 'use d1;create table d2002 using meters(groupId) tags(6);'")
# if (0 != res):
# tdLog.exit(f"create tb error")
tdSql.execute("drop database if exists d1;")
tdSql.execute("create database d1;")
tdSql.execute("use d1;")
tdSql.execute("CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);")
tdSql.execute("Create table d2001 using meters(groupId) tags(5);")
tdSql.execute("Create table d2002 using meters(groupId) tags(6);")
tdSql.execute(f"drop database if exists {self.db};")
tdSql.execute(f"create database {self.db};")
tdSql.execute(f"use {self.db};")
tdSql.execute(f"create stable {self.stable0} (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);")
tdSql.execute(f"create table {self.tb1} {self.tag1};")
tdSql.execute(f"create table {self.tb2} {self.tag2};")
tdSql.execute(f"create stable {self.stable1} (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);")
tdSql.execute(f"create stable {self.stable2} (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);")
def test(self, sql):
sql = "use d1;" + sql
sql = f"use {self.db};" + sql
res = os.system(f'taos -s "{sql}"')
# if (0 != res):
# tdLog.exit(f"taos sql error")
@ -58,41 +64,41 @@ class TDTestCase:
def check(self):
# same table, auto create + create
sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';"
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';"
self.test(sql)
# same table, create + insert
sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} file '{self.file2}';"
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} file '{self.file2}';"
self.test(sql)
# same table, insert + create
sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';"
sql = f"insert into {self.tb1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';"
self.test(sql)
# same table, insert + insert
sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb1} file '{self.file2}';"
sql = f"insert into {self.tb1} file '{self.file1}' {self.tb1} file '{self.file2}';"
self.test(sql)
# diff table auto create + create
sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';"
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';"
self.test(sql)
# diff table, create + insert
sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} file '{self.file2}';"
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} file '{self.file2}';"
self.test(sql)
# diff table, insert + create
sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';"
sql = f"insert into {self.tb1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';"
self.test(sql)
# diff table, insert + insert
sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb2} file '{self.file2}';"
sql = f"insert into {self.tb1} file '{self.file1}' {self.tb2} file '{self.file2}';"
self.test(sql)
# bigNum = 1010000
# self.check_count(5, [2100, 2100, bigNum, bigNum, bigNum])
result = os.popen("taos -s 'select count(*) from d1.%s'" %self.tb1)
result = os.popen("taos -s 'select count(*) from %s.%s'" %(self.db, self.tb1))
res = result.read()
if (f"OK" in res):
tdLog.info(f"check count success")
@ -112,7 +118,7 @@ class TDTestCase:
def test_mix(self):
#forbid use both value and file in one insert
result = os.popen(f"insert into {self.tb1} file '{self.testcasePath}/csv/2k.csv' {self.tb2} values('2021-07-13 14:06:34.630', 10.2, 219, 0.32);")
result = os.popen(f"insert into {self.tb1} file '{self.file2}' {self.tb2} values('2021-07-13 14:06:34.630', 10.2, 219, 0.32);")
res = result.read()
if (f"error" in res):
tdLog.info(f"forbid success")
@ -130,28 +136,69 @@ class TDTestCase:
print("end csv data prepare")
# auto create + insert
sql = f"INSERT INTO d2001 using meters(groupId) tags(5) FILE '{self.file1}';"
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}';"
self.test(sql)
# only insert
sql = f"INSERT INTO d2002 FILE '{self.file2}';"
sql = f"insert into {self.tb2} file '{self.file2}';"
self.test(sql)
print("end insert to table")
#tdSql.execute(f"use d1;")
tdSql.query(f"select tbname,count(*) from meters group by tbname order by tbname;")
tdSql.query(f"select tbname,count(*) from {self.stable0} group by tbname order by tbname;")
tdSql.checkRows(2)
tdSql.checkData(0, 1, rowNum1)
tdSql.checkData(1, 1, rowNum2)
print("check insert file to table success")
def make_stable_csv(self, filepath, once, qtime, startts, table_name):
f = open(filepath, 'w')
with f:
writer = csv.writer(f)
for j in range(qtime):
offset = j*once
ts = startts + offset
rows = []
for i in range(once):
rows.append([table_name, ts + i, offset + i, 'NULL'])
writer.writerows(rows)
f.close()
print(datetime.now(), filepath, " ready!")
def test_stable_csv(self):
# prepare csv
print("start stable_csv data prepare")
once = 10000
qtime1 = 101
qtime2 = 100
# rowNum1 = qtime1 * once
# rowNum2 = qtime2 * once
child_1 = f"{self.stable1}_1"
child_2 = f"{self.stable2}_1"
self.make_stable_csv(self.file1, once, qtime1, self.ts - 86400000, child_1)
self.make_stable_csv(self.file2, once, qtime2, self.ts, child_2)
print("end stable_csv data prepare")
# insert create child table of stable
sql = f"insert into {self.db}.{self.stable1}(tbname,ts,q_int,q_binary) file '{self.file1}' {self.db}.{self.stable2}(tbname,ts,q_int,q_binary) file '{self.file2}';"
self.test(sql)
print("end insert to stable")
#tdSql.execute(f"insert into {self.db}.{child_1}(ts, q_int) values(now, 1);")
tdSql.query(f"select tbname,count(*) from {self.stable1} group by tbname order by tbname;")
tdSql.checkRows(0)
print("check stable success")
def run(self):
tdSql.prepare()
self.reset_tb()
self.test_stable_csv()
self.test_bigcsv()
self.test_mix()
self.check()
tdSql.close()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())