decimal test where filtering

This commit is contained in:
wangjiaming0909 2025-03-04 15:14:52 +08:00
parent c79332be5b
commit c2b20e6722
2 changed files with 174 additions and 78 deletions

View File

@ -2294,6 +2294,7 @@ static int32_t vectorMathOpOneRowForDecimal(SScalarParam *pLeft, SScalarParam *p
} }
Decimal oneRowData = {0}; Decimal oneRowData = {0};
SDataType oneRowType = outType; SDataType oneRowType = outType;
oneRowType.precision = TSDB_DECIMAL_MAX_PRECISION;
if (pLeft == pOneRowParam) { if (pLeft == pOneRowParam) {
oneRowType.scale = leftType.scale; oneRowType.scale = leftType.scale;
code = convertToDecimal(colDataGetData(pLeft->columnData, 0), &leftType, &oneRowData, &oneRowType); code = convertToDecimal(colDataGetData(pLeft->columnData, 0), &leftType, &oneRowData, &oneRowType);

View File

@ -1,3 +1,4 @@
from concurrent.futures import thread
import math import math
from random import randrange from random import randrange
import random import random
@ -14,9 +15,25 @@ from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.common import * from util.common import *
from decimal import * from decimal import *
from multiprocessing import Value, Lock
class AtomicCounter:
def __init__(self, initial_value=0):
self._value = Value('i', initial_value)
self._lock = Lock()
def fetch_add(self, delta = 1):
with self._lock:
old_value = self._value.value
self._value.value += delta
return old_value
getcontext().prec = 40 getcontext().prec = 40
def get_decimal(val, scale: int) -> Decimal:
getcontext().prec = 40
return Decimal(val).quantize(Decimal("1." + "0" * scale), ROUND_HALF_UP)
syntax_error = -2147473920 syntax_error = -2147473920
invalid_column = -2147473918 invalid_column = -2147473918
invalid_compress_level = -2147483084 invalid_compress_level = -2147483084
@ -26,11 +43,11 @@ scalar_convert_err = -2147470768
decimal_insert_validator_test = False decimal_insert_validator_test = False
operator_test_round = 10 operator_test_round = 2
tb_insert_rows = 1000 tb_insert_rows = 1000
binary_op_with_const_test = True binary_op_with_const_test = True
binary_op_with_col_test = True binary_op_with_col_test = False
unary_op_test = True unary_op_test = False
binary_op_in_where_test = True binary_op_in_where_test = True
class DecimalTypeGeneratorConfig: class DecimalTypeGeneratorConfig:
@ -153,14 +170,19 @@ class DecimalColumnAggregator:
if v < self.min: if v < self.min:
self.min = v self.min = v
atomic_counter = AtomicCounter(0)
class TaosShell: class TaosShell:
def __init__(self): def __init__(self):
self.counter_ = atomic_counter.fetch_add()
self.queryResult = [] self.queryResult = []
self.tmp_file_path = "/tmp/taos_shell_result" self.tmp_file_path = "/tmp/taos_shell_result"
def get_file_path(self):
return f"{self.tmp_file_path}_{self.counter_}"
def read_result(self): def read_result(self):
with open(self.tmp_file_path, "r") as f: with open(self.get_file_path(), "r") as f:
lines = f.readlines() lines = f.readlines()
lines = lines[1:] lines = lines[1:]
for line in lines: for line in lines:
@ -173,12 +195,13 @@ class TaosShell:
col += 1 col += 1
def query(self, sql: str): def query(self, sql: str):
with open(self.tmp_file_path, "a+") as f: with open(self.get_file_path(), "a+") as f:
f.truncate(0) f.truncate(0)
self.queryResult = []
try: try:
command = f'taos -s "{sql} >> {self.tmp_file_path}"' command = f'taos -s "{sql} >> {self.get_file_path()}"'
result = subprocess.run( result = subprocess.run(
command, shell=True, check=True, stderr=subprocess.PIPE command, shell=True, check=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE
) )
self.read_result() self.read_result()
except Exception as e: except Exception as e:
@ -212,7 +235,7 @@ class DecimalColumnExpr:
def convert_to_res_type(self, val: Decimal) -> Decimal: def convert_to_res_type(self, val: Decimal) -> Decimal:
if self.res_type_.is_decimal_type(): if self.res_type_.is_decimal_type():
return val.quantize(Decimal("0." + "0" * self.res_type_.scale()), ROUND_HALF_UP) return get_decimal(val, self.res_type_.scale())
elif self.res_type_.type == TypeEnum.DOUBLE: elif self.res_type_.type == TypeEnum.DOUBLE:
return float(val) return float(val)
@ -245,7 +268,8 @@ class DecimalColumnExpr:
if dec_from_query != dec_from_calc: if dec_from_query != dec_from_calc:
tdLog.exit(f"filter with {self} failed, query got: {dec_from_query}, expect {dec_from_calc}, param: {params}") tdLog.exit(f"filter with {self} failed, query got: {dec_from_query}, expect {dec_from_calc}, param: {params}")
else: else:
tdLog.info(f"filter with {self} succ, query got: {dec_from_query}, expect {dec_from_calc}, param: {params}") pass
#tdLog.info(f"filter with {self} succ, query got: {dec_from_query}, expect {dec_from_calc}, param: {params}")
def check(self, query_col_res: List, tbname: str): def check(self, query_col_res: List, tbname: str):
for i in range(len(query_col_res)): for i in range(len(query_col_res)):
@ -260,7 +284,7 @@ class DecimalColumnExpr:
if v_from_calc_in_py == 'NULL' or v_from_query == 'NULL': if v_from_calc_in_py == 'NULL' or v_from_query == 'NULL':
if v_from_calc_in_py != v_from_query: if v_from_calc_in_py != v_from_query:
tdLog.exit(f"query with expr: {self} calc in py got: {v_from_calc_in_py}, query got: {v_from_query}") tdLog.exit(f"query with expr: {self} calc in py got: {v_from_calc_in_py}, query got: {v_from_query}")
tdLog.debug(f"query with expr: {self} calc got same result: NULL") #tdLog.debug(f"query with expr: {self} calc got same result: NULL")
continue continue
failed = False failed = False
if self.res_type_.type == TypeEnum.BOOL: if self.res_type_.type == TypeEnum.BOOL:
@ -280,9 +304,8 @@ class DecimalColumnExpr:
f"check decimal column failed for expr: {self}, input: {[t.__str__() for t in self.get_input_types()]}, res_type: {self.res_type_}, params: {params}, query: {v_from_query}, expect {calc_res}, but get {query_res}" f"check decimal column failed for expr: {self}, input: {[t.__str__() for t in self.get_input_types()]}, res_type: {self.res_type_}, params: {params}, query: {v_from_query}, expect {calc_res}, but get {query_res}"
) )
else: else:
tdLog.info( pass
f"op succ: {self}, in: {[t.__str__() for t in self.get_input_types()]}, res: {self.res_type_}, params: {params}, insert:{v_from_calc_in_py} query:{v_from_query}, py calc: {calc_res}" #tdLog.info( f"op succ: {self}, in: {[t.__str__() for t in self.get_input_types()]}, res: {self.res_type_}, params: {params}, insert:{v_from_calc_in_py} query:{v_from_query}, py calc: {calc_res}")
)
## format_params are already been set ## format_params are already been set
def generate_res_type(self): def generate_res_type(self):
@ -429,7 +452,7 @@ class DataType:
def check(self, values, offset: int): def check(self, values, offset: int):
return True return True
def get_typed_val_for_execute(self, val): def get_typed_val_for_execute(self, val, const_col = False):
if self.type == TypeEnum.DOUBLE: if self.type == TypeEnum.DOUBLE:
return float(val) return float(val)
elif self.type == TypeEnum.BOOL: elif self.type == TypeEnum.BOOL:
@ -438,7 +461,10 @@ class DataType:
else: else:
return 0 return 0
elif self.type == TypeEnum.FLOAT: elif self.type == TypeEnum.FLOAT:
val = float(str(numpy.float32(val))) if const_col:
val = float(str(numpy.float32(val)))
else:
val = float(numpy.float32(val))
elif isinstance(val, str): elif isinstance(val, str):
val = val.strip("'") val = val.strip("'")
return val return val
@ -499,9 +525,9 @@ class DecimalType(DataType):
def get_typed_val(self, val): def get_typed_val(self, val):
if val == "NULL": if val == "NULL":
return None return None
return Decimal(val).quantize(Decimal("1." + "0" * self.scale()), ROUND_HALF_UP) return get_decimal(val, self.scale())
def get_typed_val_for_execute(self, val): def get_typed_val_for_execute(self, val, const_col = False):
return self.get_typed_val(val) return self.get_typed_val(val)
@staticmethod @staticmethod
@ -527,7 +553,7 @@ class DecimalType(DataType):
try: try:
dec_query: Decimal = Decimal(v_from_query) dec_query: Decimal = Decimal(v_from_query)
dec_insert: Decimal = Decimal(v_from_insert) dec_insert: Decimal = Decimal(v_from_insert)
dec_insert = dec_insert.quantize(Decimal("1." + "0" * self.scale()), ROUND_HALF_UP) dec_insert = get_decimal(dec_insert, self.scale())
except Exception as e: except Exception as e:
tdLog.exit(f"failed to convert {v_from_query} or {v_from_insert} to decimal, {e}") tdLog.exit(f"failed to convert {v_from_query} or {v_from_insert} to decimal, {e}")
return False return False
@ -559,14 +585,14 @@ class Column:
def get_typed_val(self, val): def get_typed_val(self, val):
return self.type_.get_typed_val(val) return self.type_.get_typed_val(val)
def get_typed_val_for_execute(self, val): def get_typed_val_for_execute(self, val, const_col = False):
return self.type_.get_typed_val_for_execute(val) return self.type_.get_typed_val_for_execute(val, const_col)
def get_constant_val(self): def get_constant_val(self):
return self.get_typed_val(self.saved_vals[''][0]) return self.get_typed_val(self.saved_vals[''][0])
def get_constant_val_for_execute(self): def get_constant_val_for_execute(self):
return self.get_typed_val_for_execute(self.saved_vals[''][0]) return self.get_typed_val_for_execute(self.saved_vals[''][0], const_col=True)
def __str__(self): def __str__(self):
if self.is_constant_col(): if self.is_constant_col():
@ -1448,73 +1474,59 @@ class TDTestCase:
else: else:
tdLog.info(f"sql: {sql} got no output") tdLog.info(f"sql: {sql} got no output")
def check_decimal_where_with_binary_expr_with_const_col_results( def check_decimal_binary_expr_with_const_col_results_for_one_expr(
self, self,
dbname, dbname,
tbname, tbname,
tb_cols: List[Column], tb_cols: List[Column],
constant_cols: List[Column], expr: DecimalColumnExpr,
exprs: List[DecimalColumnExpr], get_constant_cols_func,
): ):
if not binary_op_in_where_test: constant_cols = get_constant_cols_func()
return for col in tb_cols:
for expr in exprs: if col.name_ == '':
for col in tb_cols: continue
if col.name_ == '': left_is_decimal = col.type_.is_decimal_type()
for const_col in constant_cols:
right_is_decimal = const_col.type_.is_decimal_type()
if expr.should_skip_for_decimal([col, const_col]):
continue continue
left_is_decimal = col.type_.is_decimal_type() const_col.generate_value()
for const_col in constant_cols: select_expr = expr.generate((const_col, col))
right_is_decimal = const_col.type_.is_decimal_type() sql = f"select {select_expr} from {dbname}.{tbname}"
if expr.should_skip_for_decimal([col, const_col]): shell = TaosShell()
continue res = shell.query(sql)
const_col.generate_value() if len(res) > 0:
select_expr = expr.generate((const_col, col)) expr.check(res[0], tbname)
expr.query_col = col select_expr = expr.generate((col, const_col))
sql = f"select {col} from {dbname}.{tbname} where {select_expr}" sql = f"select {select_expr} from {dbname}.{tbname}"
res = TaosShell().query(sql) res = shell.query(sql)
##TODO wjm no need to check len(res) for filtering test, cause we need to check for every row in the table to check if the filtering is working if len(res) > 0:
if len(res) > 0: expr.check(res[0], tbname)
expr.check_for_filtering(res[0], tbname) else:
select_expr = expr.generate((col, const_col)) tdLog.info(f"sql: {sql} got no output")
sql = f"select {col} from {dbname}.{tbname} where {select_expr}"
res = TaosShell().query(sql)
if len(res) > 0:
expr.check_for_filtering(res[0], tbname)
else:
tdLog.info(f"sql: {sql} got no output")
def check_decimal_binary_expr_with_const_col_results( def check_decimal_binary_expr_with_const_col_results(
self, self,
dbname, dbname,
tbname, tbname,
tb_cols: List[Column], tb_cols: List[Column],
constant_cols: List[Column], get_constant_cols_func,
exprs: List[DecimalColumnExpr], get_exprs_func,
): ):
constant_cols = get_constant_cols_func()
exprs: List[DecimalColumnExpr] = get_exprs_func()
if not binary_op_with_const_test: if not binary_op_with_const_test:
return return
ts: list[threading.Thread] = []
for expr in exprs: for expr in exprs:
for col in tb_cols: t = self.run_in_thread2(
if col.name_ == '': self.check_decimal_binary_expr_with_const_col_results_for_one_expr,
continue (dbname, tbname, tb_cols, expr, get_constant_cols_func),
left_is_decimal = col.type_.is_decimal_type() )
for const_col in constant_cols: ts.append(t)
right_is_decimal = const_col.type_.is_decimal_type() for t in ts:
if expr.should_skip_for_decimal([col, const_col]): t.join()
continue
const_col.generate_value()
select_expr = expr.generate((const_col, col))
sql = f"select {select_expr} from {dbname}.{tbname}"
res = TaosShell().query(sql)
if len(res) > 0:
expr.check(res[0], tbname)
select_expr = expr.generate((col, const_col))
sql = f"select {select_expr} from {dbname}.{tbname}"
res = TaosShell().query(sql)
if len(res) > 0:
expr.check(res[0], tbname)
else:
tdLog.info(f"sql: {sql} got no output")
def check_decimal_unary_expr_results(self, dbname, tbname, tb_cols: List[Column], exprs: List[DecimalColumnExpr]): def check_decimal_unary_expr_results(self, dbname, tbname, tb_cols: List[Column], exprs: List[DecimalColumnExpr]):
if not unary_op_test: if not unary_op_test:
@ -1533,6 +1545,20 @@ class TDTestCase:
else: else:
tdLog.info(f"sql: {sql} got no output") tdLog.info(f"sql: {sql} got no output")
def run_in_thread(self, times, func, params) -> threading.Thread:
threads: List[threading.Thread] = []
for i in range(times):
t = threading.Thread(target=func, args=params)
t.start()
threads.append(t)
for t in threads:
t.join()
def run_in_thread2(self, func, params) -> threading.Thread:
t = threading.Thread(target=func, args=params)
t.start()
return t
## test others unsupported types operator with decimal ## test others unsupported types operator with decimal
def test_decimal_unsupported_types(self): def test_decimal_unsupported_types(self):
unsupported_type = [ unsupported_type = [
@ -1583,17 +1609,19 @@ class TDTestCase:
## tables: meters, nt ## tables: meters, nt
## columns: c1, c2, c3, c4, c5, c7, c8, c9, c10, c99, c100 ## columns: c1, c2, c3, c4, c5, c7, c8, c9, c10, c99, c100
binary_operators = DecimalBinaryOperator.get_all_binary_ops() binary_operators = DecimalBinaryOperator.get_all_binary_ops()
all_type_columns = Column.get_decimal_oper_const_cols()
## decimal operator with constants of all other types ## decimal operator with constants of all other types
for i in range(operator_test_round): self.run_in_thread(
self.check_decimal_binary_expr_with_const_col_results( operator_test_round,
self.check_decimal_binary_expr_with_const_col_results,
(
self.db_name, self.db_name,
self.norm_table_name, self.norm_table_name,
self.norm_tb_columns, self.norm_tb_columns,
all_type_columns, Column.get_decimal_oper_const_cols,
binary_operators, DecimalBinaryOperator.get_all_binary_ops,
) ),
)
## test decimal column op decimal column ## test decimal column op decimal column
for i in range(operator_test_round): for i in range(operator_test_round):
@ -1619,6 +1647,66 @@ class TDTestCase:
def test_query_decimal_with_sma(self): def test_query_decimal_with_sma(self):
pass pass
def check_decimal_where_with_binary_expr_with_const_col_results(
self,
dbname,
tbname,
tb_cols: List[Column],
constant_cols: List[Column],
exprs: List[DecimalColumnExpr],
):
if not binary_op_in_where_test:
return
for expr in exprs:
for col in tb_cols:
if col.name_ == '':
continue
left_is_decimal = col.type_.is_decimal_type()
for const_col in constant_cols:
right_is_decimal = const_col.type_.is_decimal_type()
if expr.should_skip_for_decimal([col, const_col]):
continue
const_col.generate_value()
select_expr = expr.generate((const_col, col))
expr.query_col = col
sql = f"select {col} from {dbname}.{tbname} where {select_expr}"
res = TaosShell().query(sql)
##TODO wjm no need to check len(res) for filtering test, cause we need to check for every row in the table to check if the filtering is working
if len(res) > 0:
expr.check_for_filtering(res[0], tbname)
select_expr = expr.generate((col, const_col))
sql = f"select {col} from {dbname}.{tbname} where {select_expr}"
res = TaosShell().query(sql)
if len(res) > 0:
expr.check_for_filtering(res[0], tbname)
else:
tdLog.info(f"sql: {sql} got no output")
def check_decimal_where_with_binary_expr_with_col_results(
self, dbname, tbname, tb_cols: List[Column], exprs: List[DecimalColumnExpr]
):
if not binary_op_in_where_test:
return
for expr in exprs:
for col in tb_cols:
if col.name_ == '':
continue
for col2 in tb_cols:
if expr.should_skip_for_decimal([col, col2]):
continue
select_expr = expr.generate((col, col2))
sql = f"select {col} from {dbname}.{tbname} where {select_expr}"
res = TaosShell().query(sql)
if len(res) > 0:
expr.check_for_filtering(res[0], tbname)
select_expr = expr.generate((col2, col))
sql = f"select {col} from {dbname}.{tbname} where {select_expr}"
res = TaosShell().query(sql)
if len(res) > 0:
expr.check_for_filtering(res[0], tbname)
else:
tdLog.info(f"sql: {sql} got no output")
def test_query_decimal_where_clause(self): def test_query_decimal_where_clause(self):
binary_compare_ops = DecimalBinaryOperator.get_all_filtering_binary_compare_ops() binary_compare_ops = DecimalBinaryOperator.get_all_filtering_binary_compare_ops()
const_cols = Column.get_decimal_oper_const_cols() const_cols = Column.get_decimal_oper_const_cols()
@ -1630,6 +1718,13 @@ class TDTestCase:
const_cols, const_cols,
binary_compare_ops, binary_compare_ops,
) )
for i in range(operator_test_round):
self.check_decimal_where_with_binary_expr_with_col_results(
self.db_name,
self.norm_table_name,
self.norm_tb_columns,
binary_compare_ops)
## test filtering with decimal exprs ## test filtering with decimal exprs
## 1. dec op const col ## 1. dec op const col
## 2. dec op dec ## 2. dec op dec