udf case: varchar
This commit is contained in:
parent
a360b71f7c
commit
f6bd86272b
|
@ -237,18 +237,20 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentR
|
|||
(void)memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes);
|
||||
} else {
|
||||
int32_t dataLen = varDataTLen(pData);
|
||||
if (meta->type == TSDB_DATA_TYPE_JSON) {
|
||||
if (*pData == TSDB_DATA_TYPE_NULL) {
|
||||
dataLen = 0;
|
||||
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
|
||||
dataLen = varDataTLen(pData + sizeof(char));
|
||||
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
|
||||
dataLen = sizeof(int64_t);
|
||||
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
|
||||
dataLen = sizeof(char);
|
||||
}
|
||||
dataLen += sizeof(char);
|
||||
}
|
||||
// This is a piece of code to help users implement udf. It is only called during testing.
|
||||
// Currently, the json type is not supported and will not be called.
|
||||
// if (meta->type == TSDB_DATA_TYPE_JSON) {
|
||||
// if (*pData == TSDB_DATA_TYPE_NULL) {
|
||||
// dataLen = 0;
|
||||
// } else if (*pData == TSDB_DATA_TYPE_NCHAR) {
|
||||
// dataLen = varDataTLen(pData + sizeof(char));
|
||||
// } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
|
||||
// dataLen = sizeof(int64_t);
|
||||
// } else if (*pData == TSDB_DATA_TYPE_BOOL) {
|
||||
// dataLen = sizeof(char);
|
||||
// }
|
||||
// dataLen += sizeof(char);
|
||||
// }
|
||||
|
||||
if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) {
|
||||
uint32_t newSize = data->varLenCol.payloadAllocLen;
|
||||
|
|
|
@ -106,3 +106,67 @@ DLL_EXPORT int32_t UDFNAME(SUdfDataBlock *block, SUdfColumn *resultCol) {
|
|||
#endif // ifdef CHANGE_UDF_PROCESS_FAILED
|
||||
}
|
||||
#endif // ifdef CHANGE_UDF_NO_PROCESS
|
||||
|
||||
|
||||
|
||||
|
||||
/********************************************************************************************************************/
|
||||
// udf revert functions
|
||||
/********************************************************************************************************************/
|
||||
DLL_EXPORT int32_t udf_reverse_init() { return 0; }
|
||||
|
||||
DLL_EXPORT int32_t udf_reverse_destroy() { return 0; }
|
||||
|
||||
static void reverse_data(char* data, size_t len) {
|
||||
size_t i, j;
|
||||
char temp;
|
||||
for (i = 0, j = len - 1; i < j; i++, j--) {
|
||||
temp = data[i];
|
||||
data[i] = data[j];
|
||||
data[j] = temp;
|
||||
}
|
||||
}
|
||||
|
||||
DLL_EXPORT int32_t udf_reverse(SUdfDataBlock *block, SUdfColumn *resultCol) {
|
||||
int32_t code = 0;
|
||||
SUdfColumnData *resultData = &resultCol->colData;
|
||||
for (int32_t i = 0; i < block->numOfRows; ++i) {
|
||||
int j = 0;
|
||||
for (; j < block->numOfCols; ++j) {
|
||||
if (udfColDataIsNull(block->udfCols[j], i)) {
|
||||
code = udfColDataSetNull(resultCol, i);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
int32_t oldLen = udfColDataGetDataLen(block->udfCols[j], i);
|
||||
char *pOldData = udfColDataGetData(block->udfCols[j], i);
|
||||
|
||||
|
||||
char *buff = malloc(sizeof(VarDataLenT) + oldLen);
|
||||
if (buff == NULL) {
|
||||
return -1;
|
||||
}
|
||||
((VarDataLenT *)buff)[0] = (VarDataLenT)oldLen;
|
||||
memcpy(buff, pOldData, oldLen + sizeof(VarDataLenT));
|
||||
reverse_data(buff + sizeof(VarDataLenT), oldLen);
|
||||
code = udfColDataSet(resultCol, i, buff, false);
|
||||
if (code != 0) {
|
||||
free(buff);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// to simulate actual processing delay by udf
|
||||
#ifdef LINUX
|
||||
usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second)
|
||||
#endif
|
||||
#ifdef WINDOWS
|
||||
Sleep(1);
|
||||
#endif
|
||||
resultData->numOfRows = block->numOfRows;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -8,3 +8,6 @@ TSDB_CODE_MND_FUNC_NOT_EXIST = (TAOS_DEF_ERROR_CODE | 0x0374)
|
|||
|
||||
|
||||
TSDB_CODE_UDF_FUNC_EXEC_FAILURE = (TAOS_DEF_ERROR_CODE | 0x290A)
|
||||
|
||||
|
||||
TSDB_CODE_TSC_INTERNAL_ERROR = (TAOS_DEF_ERROR_CODE | 0x02FF)
|
||||
|
|
|
@ -728,6 +728,38 @@ class TDTestCase:
|
|||
tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
|
||||
tdLog.info(f"change udf test finished, using {lib_name}")
|
||||
|
||||
def test_change_udf_reverse(self):
|
||||
tdSql.execute("create database if not exists db duration 100")
|
||||
tdSql.execute("use db")
|
||||
|
||||
func_name = "udf_reverse"
|
||||
tdSql.execute(f"create function {func_name} as '%s' outputtype nchar(256)"%self.libchange_udf_normal)
|
||||
functions = tdSql.getResult("show functions")
|
||||
for function in functions:
|
||||
if f"{func_name}" in function[0]:
|
||||
tdLog.info(f"create {func_name} functions success, using {self.libchange_udf_normal}")
|
||||
break
|
||||
|
||||
tdSql.query(f"select {func_name}(c8) from db.t1")
|
||||
tdSql.execute(f"drop function {func_name}")
|
||||
tdSql.error(f"select {func_name}(num1) from db.tb", TSDB_CODE_MND_FUNC_NOT_EXIST)
|
||||
|
||||
self.test_change_udf_normal("change_udf_normal")
|
||||
tdSql.execute(f"create function {func_name} as '%s' outputtype varchar(256)"%self.libchange_udf_normal)
|
||||
functions = tdSql.getResult("show functions")
|
||||
for function in functions:
|
||||
if f"{func_name}" in function[0]:
|
||||
tdLog.info(f"create {func_name} functions success, using {self.libchange_udf_normal}")
|
||||
break
|
||||
|
||||
tdSql.query(f"select {func_name}(c8) from db.t1 order by ts")
|
||||
tdSql.checkData(0,0, None)
|
||||
tdSql.checkData(1,0, "1yranib")
|
||||
tdSql.checkData(2,0, "2yranib")
|
||||
tdSql.checkData(3,0, "3yranib")
|
||||
|
||||
|
||||
|
||||
def unexpected_using_test(self):
|
||||
tdSql.execute("use db ")
|
||||
|
||||
|
@ -768,6 +800,7 @@ class TDTestCase:
|
|||
|
||||
self.unexpected_using_test()
|
||||
self.create_udf_function()
|
||||
self.test_change_udf_reverse()
|
||||
self.basic_udf_query()
|
||||
self.loop_kill_udfd()
|
||||
tdSql.execute(" drop function udf1 ")
|
||||
|
|
Loading…
Reference in New Issue