enh: udf exception test case

This commit is contained in:
factosea 2024-12-20 16:57:17 +08:00
parent 236121bbf6
commit a360b71f7c
4 changed files with 259 additions and 0 deletions

View File

@ -142,6 +142,50 @@ target_link_libraries(
udf2_dup PUBLIC os ${LINK_JEMALLOC}
)
set(TARGET_NAMES
change_udf_normal
change_udf_no_init
change_udf_no_process
change_udf_no_destroy
change_udf_init_failed
change_udf_process_failed
change_udf_destory_failed
)
set(COMPILE_DEFINITIONS
CHANGE_UDF_NORMAL
CHANGE_UDF_NO_INIT
CHANGE_UDF_NO_PROCESS
CHANGE_UDF_NO_DESTROY
CHANGE_UDF_INIT_FAILED
CHANGE_UDF_PROCESS_FAILED
CHANGE_UDF_DESTORY_FAILED
)
foreach(index RANGE 0 6)
list(GET TARGET_NAMES ${index} target_name)
list(GET COMPILE_DEFINITIONS ${index} compile_def)
add_library(${target_name} STATIC MODULE test/change_udf.c)
target_include_directories(
${target_name}
PUBLIC
"${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util"
"${TD_SOURCE_DIR}/include/common"
"${TD_SOURCE_DIR}/include/client"
"${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_compile_definitions(${target_name} PRIVATE ${compile_def})
IF(TD_LINUX_64 AND JEMALLOC_ENABLED)
ADD_DEPENDENCIES(${target_name} jemalloc)
ENDIF()
target_link_libraries(
${target_name} PUBLIC os ${LINK_JEMALLOC}
)
endforeach()
# SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin)
add_executable(udfd src/udfd.c)

View File

@ -0,0 +1,108 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef LINUX
#include <unistd.h>
#endif
#ifdef WINDOWS
#include <windows.h>
#endif
#include "taosudf.h"
// rename function name
#ifdef CHANGE_UDF_NORMAL
#define UDFNAME change_udf_normal
#define UDFNAMEINIT change_udf_normal_init
#define UDFNAMEDESTROY change_udf_normal_destroy
#elif defined(CHANGE_UDF_NO_INIT)
#define UDFNAME change_udf_no_init
#define UDFNAMEINIT change_udf_no_init_init
#define UDFNAMEDESTROY change_udf_no_init_destroy
#elif defined(CHANGE_UDF_NO_PROCESS)
#define UDFNAME change_udf_no_process
#define UDFNAMEINIT change_udf_no_process_init
#define UDFNAMEDESTROY change_udf_no_process_destroy
#elif defined(CHANGE_UDF_NO_DESTROY)
#define UDFNAME change_udf_no_destroy
#define UDFNAMEINIT change_udf_no_destroy_init
#define UDFNAMEDESTROY change_udf_no_destroy_destroy
#elif defined(CHANGE_UDF_INIT_FAILED)
#define UDFNAME change_udf_init_failed
#define UDFNAMEINIT change_udf_init_failed_init
#define UDFNAMEDESTROY change_udf_init_failed_destroy
#elif defined(CHANGE_UDF_PROCESS_FAILED)
#define UDFNAME change_udf_process_failed
#define UDFNAMEINIT change_udf_process_failed_init
#define UDFNAMEDESTROY change_udf_process_failed_destroy
#elif defined(CHANGE_UDF_DESTORY_FAILED)
#define UDFNAME change_udf_destory_failed
#define UDFNAMEINIT change_udf_destory_failed_init
#define UDFNAMEDESTROY change_udf_destory_failed_destroy
#else
#define UDFNAME change_udf_normal
#define UDFNAMEINIT change_udf_normal_init
#define UDFNAMEDESTROY change_udf_normal_destroy
#endif
#ifdef CHANGE_UDF_NO_INIT
#else
DLL_EXPORT int32_t UDFNAMEINIT() {
#ifdef CHANGE_UDF_INIT_FAILED
return -1;
#else
return 0;
#endif // ifdef CHANGE_UDF_INIT_FAILED
}
#endif // ifdef CHANGE_UDF_NO_INIT
#ifdef CHANGE_UDF_NO_DESTROY
#else
DLL_EXPORT int32_t UDFNAMEDESTROY() {
#ifdef CHANGE_UDF_DESTORY_FAILED
return -1;
#else
return 0;
#endif // ifdef CHANGE_UDF_DESTORY_FAILED
}
#endif // ifdef CHANGE_UDF_NO_DESTROY
#ifdef CHANGE_UDF_NO_PROCESS
#else
DLL_EXPORT int32_t UDFNAME(SUdfDataBlock *block, SUdfColumn *resultCol) {
#ifdef CHANGE_UDF_PROCESS_FAILED
return -1;
#else
int32_t code = 0;
SUdfColumnData *resultData = &resultCol->colData;
for (int32_t i = 0; i < block->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
code = udfColDataSetNull(resultCol, i);
if (code != 0) {
return code;
}
break;
}
}
if (j == block->numOfCols) {
int32_t luckyNum = 1;
code = udfColDataSet(resultCol, i, (char *)&luckyNum, false);
if (code != 0) {
return code;
}
}
}
// to simulate actual processing delay by udf
#ifdef LINUX
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
#endif // ifdef LINUX
#ifdef WINDOWS
Sleep(1);
#endif // ifdef WINDOWS
resultData->numOfRows = block->numOfRows;
return 0;
#endif // ifdef CHANGE_UDF_PROCESS_FAILED
}
#endif // ifdef CHANGE_UDF_NO_PROCESS

View File

@ -0,0 +1,10 @@
import ctypes
TAOS_SYSTEM_ERROR = ctypes.c_int32(0x80ff0000).value
TAOS_DEF_ERROR_CODE = ctypes.c_int32(0x80000000).value
TSDB_CODE_MND_FUNC_NOT_EXIST = (TAOS_DEF_ERROR_CODE | 0x0374)
TSDB_CODE_UDF_FUNC_EXEC_FAILURE = (TAOS_DEF_ERROR_CODE | 0x290A)

View File

@ -11,6 +11,7 @@ from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.tserror import *
import subprocess
class TDTestCase:
@ -56,8 +57,32 @@ class TDTestCase:
else:
self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_no_init = subprocess.Popen('find %s -name "libchange_udf_no_init.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_no_process = subprocess.Popen('find %s -name "libchange_udf_no_process.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_no_destroy = subprocess.Popen('find %s -name "libchange_udf_no_destroy.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_init_failed = subprocess.Popen('find %s -name "libchange_udf_init_failed.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_process_failed = subprocess.Popen('find %s -name "libchange_udf_process_failed.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_destroy_failed = subprocess.Popen('find %s -name "libchange_udf_destory_failed.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libchange_udf_normal = subprocess.Popen('find %s -name "libchange_udf_normal.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1 = self.libudf1.replace('\r','').replace('\n','')
self.libudf2 = self.libudf2.replace('\r','').replace('\n','')
self.libchange_udf_no_init = self.libchange_udf_no_init.replace('\r','').replace('\n','')
self.libchange_udf_no_process = self.libchange_udf_no_process.replace('\r','').replace('\n','')
self.libchange_udf_normal = self.libchange_udf_normal.replace('\r','').replace('\n','')
self.libchange_udf_no_destroy = self.libchange_udf_no_destroy.replace('\r','').replace('\n','')
self.libchange_udf_init_failed = self.libchange_udf_init_failed.replace('\r','').replace('\n','')
self.libchange_udf_process_failed = self.libchange_udf_process_failed.replace('\r','').replace('\n','')
self.libchange_udf_destroy_failed = self.libchange_udf_destroy_failed.replace('\r','').replace('\n','')
tdLog.info(f"udf1 so path is {self.libudf1}")
tdLog.info(f"udf2 so path is {self.libudf2}")
tdLog.info(f"change_udf_no_init so path is {self.libchange_udf_no_init}")
tdLog.info(f"change_udf_no_process so path is {self.libchange_udf_no_process}")
tdLog.info(f"change_udf_no_destroy so path is {self.libchange_udf_no_destroy}")
tdLog.info(f"change_udf_init_failed so path is {self.libchange_udf_init_failed}")
tdLog.info(f"change_udf_process_failed so path is {self.libchange_udf_process_failed}")
tdLog.info(f"change_udf_destroy_failed so path is {self.libchange_udf_destroy_failed}")
tdLog.info(f"change_udf_normal so path is {self.libchange_udf_normal}")
def prepare_data(self):
@ -664,12 +689,84 @@ class TDTestCase:
path = ''.join(random.choice(letters) for i in range(5000))
os.system(f"udfd -c {path}")
def test_change_udf_normal(self, func_name):
# create function with normal file
tdSql.execute(f"create function {func_name} as '%s' outputtype int"%self.libchange_udf_normal)
functions = tdSql.getResult("show functions")
for function in functions:
if f"{func_name}" in function[0]:
tdLog.info(f"create {func_name} functions success, using {self.libchange_udf_normal}")
break
tdSql.query(f"select num1 , {func_name}(num1) ,num2 ,{func_name}(num2),num3 ,{func_name}(num3),num4 ,{func_name}(num4) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE)
tdSql.checkData(0,0,None)
tdSql.checkData(0,1,None)
tdSql.checkData(0,2,1)
tdSql.checkData(0,3,1)
tdSql.checkData(0,4,1.000000000)
tdSql.checkData(0,5,1)
tdSql.checkData(0,6,"binary1")
tdSql.checkData(0,7,1)
tdSql.query(f"select {func_name}(num1) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE)
tdSql.execute(f"drop function {func_name}")
tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
tdLog.info(f"change udf test finished, using {self.libchange_udf_normal}")
def test_change_udf_failed(self, func_name, lib_name):
tdLog.info(f"test change udf start: using {lib_name}")
tdSql.error(f"select num1 , {func_name}(num1) ,num2 ,{func_name}(num2),num3 ,{func_name}(num3),num4 ,{func_name}(num4) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
tdSql.execute(f"create function {func_name} as '{lib_name}' outputtype int")
functions = tdSql.getResult("show functions")
for function in functions:
if f"{func_name}" in function[0]:
tdLog.info(f"create {func_name} functions success, using {lib_name}")
break
tdSql.error(f"select num1 , {func_name}(num1) ,num2 ,{func_name}(num2),num3 ,{func_name}(num3),num4 ,{func_name}(num4) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE)
tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_UDF_FUNC_EXEC_FAILURE)
tdSql.execute(f"drop function {func_name}")
tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
tdLog.info(f"change udf test finished, using {lib_name}")
def unexpected_using_test(self):
tdSql.execute("use db ")
# create function without wrong file path
tdSql.error("create function udf1 as '%s_wrongpath' outputtype int;"%self.libudf1, TAOS_SYSTEM_ERROR|2)
tdSql.error("create aggregate function udf2 as '%s_wrongpath' outputtype double;"%self.libudf2, TAOS_SYSTEM_ERROR|2)
tdSql.execute("create function udf1 as '%s' outputtype int;"%self.libudf1)
tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb")
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_no_init", self.libchange_udf_no_init)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_no_process", self.libchange_udf_no_process)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_no_destroy", self.libchange_udf_no_destroy)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_init_failed", self.libchange_udf_init_failed)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("change_udf_process_failed", self.libchange_udf_process_failed)
self.test_change_udf_normal("change_udf_normal")
self.test_change_udf_failed("libchange_udf_destroy_failed", self.libchange_udf_destroy_failed)
tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb")
tdSql.execute(f"drop function udf1")
tdSql.error("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
print(" env is ok for all ")
self.test_udfd_cmd()
self.prepare_udf_so()
self.prepare_data()
self.unexpected_using_test()
self.create_udf_function()
self.basic_udf_query()
self.loop_kill_udfd()