From 09f6411f6640437e08fbdb2037a8ba560aab73b7 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 8 Aug 2024 16:22:49 +0800 Subject: [PATCH 1/3] fix: format udf example codes --- tests/script/sh/bit_and.c | 2 +- tests/script/sh/l2norm.c | 30 +++----- tests/script/sh/max_vol.c | 149 ++++++++++++++++++-------------------- 3 files changed, 82 insertions(+), 99 deletions(-) diff --git a/tests/script/sh/bit_and.c b/tests/script/sh/bit_and.c index 2cf2157e1c..84485d396b 100644 --- a/tests/script/sh/bit_and.c +++ b/tests/script/sh/bit_and.c @@ -55,7 +55,7 @@ DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn* resultCol) { } resultData->numOfRows = block->numOfRows; - udfTrace("block:%p, processing completed, rows:%d, cols:%d,", block, block->numOfRows, block->numOfCols); + udfTrace("block:%p, processing completed", block); return TSDB_CODE_SUCCESS; } diff --git a/tests/script/sh/l2norm.c b/tests/script/sh/l2norm.c index 0b7f5bf7f6..865d9ee9a5 100644 --- a/tests/script/sh/l2norm.c +++ b/tests/script/sh/l2norm.c @@ -1,32 +1,26 @@ -#include -#include -#include #include - +#include +#include +#include #include "taosudf.h" -DLL_EXPORT int32_t l2norm_init() { - return 0; -} +DLL_EXPORT int32_t l2norm_init() { return 0; } -DLL_EXPORT int32_t l2norm_destroy() { - return 0; -} +DLL_EXPORT int32_t l2norm_destroy() { return 0; } -DLL_EXPORT int32_t l2norm_start(SUdfInterBuf *buf) { +DLL_EXPORT int32_t l2norm_start(SUdfInterBuf* buf) { *(int64_t*)(buf->buf) = 0; buf->bufLen = sizeof(double); buf->numOfResult = 1; return 0; } -DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { +DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) { double sumSquares = *(double*)interBuf->buf; int8_t numNotNull = 0; for (int32_t i = 0; i < block->numOfCols; ++i) { SUdfColumn* col = block->udfCols[i]; - if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || - col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) { + if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) { return TSDB_CODE_UDF_INVALID_INPUT; } } @@ -38,18 +32,18 @@ DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInte } switch (col->colMeta.type) { case TSDB_DATA_TYPE_INT: { - char* cell = udfColDataGetData(col, j); + char* cell = udfColDataGetData(col, j); int32_t num = *(int32_t*)cell; sumSquares += (double)num * num; break; } case TSDB_DATA_TYPE_DOUBLE: { - char* cell = udfColDataGetData(col, j); + char* cell = udfColDataGetData(col, j); double num = *(double*)cell; sumSquares += num * num; break; } - default: + default: break; } ++numNotNull; @@ -62,7 +56,7 @@ DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInte return 0; } -DLL_EXPORT int32_t l2norm_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) { +DLL_EXPORT int32_t l2norm_finish(SUdfInterBuf* buf, SUdfInterBuf* resultData) { double sumSquares = *(double*)(buf->buf); *(double*)(resultData->buf) = sqrt(sumSquares); resultData->bufLen = sizeof(double); diff --git a/tests/script/sh/max_vol.c b/tests/script/sh/max_vol.c index 4f9ecd33a7..0a57a26d1c 100644 --- a/tests/script/sh/max_vol.c +++ b/tests/script/sh/max_vol.c @@ -1,101 +1,90 @@ -#include -#include -#include #include - +#include +#include +#include #include "taosudf.h" -#define STR_MAX_LEN 256 // inter buffer length +#define STR_MAX_LEN 256 // inter buffer length // init -DLL_EXPORT int32_t max_vol_init() -{ - return 0; -} +DLL_EXPORT int32_t max_vol_init() { return 0; } // destory -DLL_EXPORT int32_t max_vol_destroy() -{ - return 0; -} +DLL_EXPORT int32_t max_vol_destroy() { return 0; } -// start -DLL_EXPORT int32_t max_vol_start(SUdfInterBuf *buf) -{ - memset(buf->buf, 0, sizeof(float) + STR_MAX_LEN); - // set init value - *((float*)buf->buf) = -10000000; - buf->bufLen = sizeof(float) + STR_MAX_LEN; - buf->numOfResult = 0; - return 0; +// start +DLL_EXPORT int32_t max_vol_start(SUdfInterBuf *buf) { + memset(buf->buf, 0, sizeof(float) + STR_MAX_LEN); + // set init value + *((float *)buf->buf) = -10000000; + buf->bufLen = sizeof(float) + STR_MAX_LEN; + buf->numOfResult = 0; + return 0; } DLL_EXPORT int32_t max_vol(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { - float maxValue = *(float *)interBuf->buf; - char strBuff[STR_MAX_LEN] = "inter1buf"; - - if (block->numOfCols < 2) - { + float maxValue = *(float *)interBuf->buf; + char strBuff[STR_MAX_LEN] = "inter1buf"; + + if (block->numOfCols < 2) { + return TSDB_CODE_UDF_INVALID_INPUT; + } + + // check data type + for (int32_t i = 0; i < block->numOfCols; ++i) { + SUdfColumn *col = block->udfCols[i]; + if (i == block->numOfCols - 1) { + // last column is device id , must varchar + if (col->colMeta.type != TSDB_DATA_TYPE_VARCHAR) { return TSDB_CODE_UDF_INVALID_INPUT; + } + } else { + if (col->colMeta.type != TSDB_DATA_TYPE_FLOAT) { + return TSDB_CODE_UDF_INVALID_INPUT; + } } + } - // check data type - for (int32_t i = 0; i < block->numOfCols; ++i) - { - SUdfColumn *col = block->udfCols[i]; - if( i == block->numOfCols - 1) { - // last column is device id , must varchar - if (col->colMeta.type != TSDB_DATA_TYPE_VARCHAR ) { - return TSDB_CODE_UDF_INVALID_INPUT; - } - } else { - if (col->colMeta.type != TSDB_DATA_TYPE_FLOAT) { - return TSDB_CODE_UDF_INVALID_INPUT; - } - } + // calc max voltage + SUdfColumn *lastCol = block->udfCols[block->numOfCols - 1]; + for (int32_t i = 0; i < (block->numOfCols - 1); ++i) { + for (int32_t j = 0; j < block->numOfRows; ++j) { + SUdfColumn *col = block->udfCols[i]; + if (udfColDataIsNull(col, j)) { + continue; + } + char *data = udfColDataGetData(col, j); + float voltage = *(float *)data; + if (voltage > maxValue) { + maxValue = voltage; + char *valData = udfColDataGetData(lastCol, j); + // get device id + char *deviceId = valData + sizeof(uint16_t); + sprintf(strBuff, "%s_(%d,%d)_%f", deviceId, j, i, maxValue); + } } + } - // calc max voltage - SUdfColumn *lastCol = block->udfCols[block->numOfCols - 1]; - for (int32_t i = 0; i < (block->numOfCols - 1); ++i) { - for (int32_t j = 0; j < block->numOfRows; ++j) { - SUdfColumn *col = block->udfCols[i]; - if (udfColDataIsNull(col, j)) { - continue; - } - char *data = udfColDataGetData(col, j); - float voltage = *(float *)data; - if (voltage > maxValue) { - maxValue = voltage; - char *valData = udfColDataGetData(lastCol, j); - // get device id - char *deviceId = valData + sizeof(uint16_t); - sprintf(strBuff, "%s_(%d,%d)_%f", deviceId, j, i, maxValue); - } - } - } - - *(float*)newInterBuf->buf = maxValue; - strcpy(newInterBuf->buf + sizeof(float), strBuff); - newInterBuf->bufLen = sizeof(float) + strlen(strBuff)+1; - newInterBuf->numOfResult = 1; - return 0; + *(float *)newInterBuf->buf = maxValue; + strcpy(newInterBuf->buf + sizeof(float), strBuff); + newInterBuf->bufLen = sizeof(float) + strlen(strBuff) + 1; + newInterBuf->numOfResult = 1; + return 0; } -DLL_EXPORT int32_t max_vol_finish(SUdfInterBuf *buf, SUdfInterBuf *resultData) -{ - char * str = buf->buf + sizeof(float); - // copy to des - char * des = resultData->buf + sizeof(uint16_t); - strcpy(des, str); +DLL_EXPORT int32_t max_vol_finish(SUdfInterBuf *buf, SUdfInterBuf *resultData) { + char *str = buf->buf + sizeof(float); + // copy to des + char *des = resultData->buf + sizeof(uint16_t); + strcpy(des, str); - // set binary type len - uint16_t len = strlen(str); - *((uint16_t*)resultData->buf) = len; + // set binary type len + uint16_t len = strlen(str); + *((uint16_t *)resultData->buf) = len; - // set buf len - resultData->bufLen = len + sizeof(uint16_t); - // set row count - resultData->numOfResult = 1; - return 0; + // set buf len + resultData->bufLen = len + sizeof(uint16_t); + // set row count + resultData->numOfResult = 1; + return 0; } From 8e81abe2248b3e3f74e68f7136688293251b22aa Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 8 Aug 2024 19:18:50 +0800 Subject: [PATCH 2/3] enh: get vardata len from udf api --- include/libs/function/taosudf.h | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 0b59d7c2f5..91487e5d1d 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -131,6 +131,14 @@ static FORCE_INLINE char *udfColDataGetData(const SUdfColumn *pColumn, int32_t r } } +static FORCE_INLINE int32_t udfColDataGetDataLen(const SUdfColumn *pColumn, int32_t row) { + if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) { + return *(uint16_t*)(pColumn->colData.varLenCol.payload + pColumn->colData.varLenCol.varOffsets[row]); + } else { + return pColumn->colMeta.bytes; + } +} + static FORCE_INLINE bool udfColDataIsNull(const SUdfColumn *pColumn, int32_t row) { if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) { if (pColumn->colMeta.type == TSDB_DATA_TYPE_JSON) { From 7be4d5b592cfb29757d16d418c45db735b62caf4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 8 Aug 2024 19:19:23 +0800 Subject: [PATCH 3/3] enh: adjust udf example codes --- tests/script/sh/bit_and.c | 4 ++-- tests/script/sh/l2norm.c | 32 +++++++++++++++++++++----- tests/script/sh/max_vol.c | 47 +++++++++++++++++++++++++++++---------- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/tests/script/sh/bit_and.c b/tests/script/sh/bit_and.c index 84485d396b..c35f1da171 100644 --- a/tests/script/sh/bit_and.c +++ b/tests/script/sh/bit_and.c @@ -27,8 +27,8 @@ DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn* resultCol) { for (int32_t i = 0; i < block->numOfRows; ++i) { if (udfColDataIsNull(block->udfCols[0], i)) { - udfTrace("block:%p, row:%d result is null since col:0 is null", block, i); udfColDataSetNull(resultCol, i); + udfTrace("block:%p, row:%d result is null since col:0 is null", block, i); continue; } @@ -38,8 +38,8 @@ DLL_EXPORT int32_t bit_and(SUdfDataBlock* block, SUdfColumn* resultCol) { int32_t j = 1; for (; j < block->numOfCols; ++j) { if (udfColDataIsNull(block->udfCols[j], i)) { - udfTrace("block:%p, row:%d result is null since col:%d is null", block, i, j); udfColDataSetNull(resultCol, i); + udfTrace("block:%p, row:%d result is null since col:%d is null", block, i, j); break; } diff --git a/tests/script/sh/l2norm.c b/tests/script/sh/l2norm.c index 865d9ee9a5..e2f379fd29 100644 --- a/tests/script/sh/l2norm.c +++ b/tests/script/sh/l2norm.c @@ -9,38 +9,55 @@ DLL_EXPORT int32_t l2norm_init() { return 0; } DLL_EXPORT int32_t l2norm_destroy() { return 0; } DLL_EXPORT int32_t l2norm_start(SUdfInterBuf* buf) { + int32_t bufLen = sizeof(double); + if (buf->bufLen < bufLen) { + udfError("failed to execute udf since input buflen:%d < %d", buf->bufLen, bufLen); + return TSDB_CODE_UDF_INVALID_BUFSIZE; + } + + udfTrace("start aggregation, buflen:%d used:%d", buf->bufLen, bufLen); *(int64_t*)(buf->buf) = 0; - buf->bufLen = sizeof(double); - buf->numOfResult = 1; + buf->bufLen = bufLen; + buf->numOfResult = 0; return 0; } DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) { - double sumSquares = *(double*)interBuf->buf; - int8_t numNotNull = 0; + udfTrace("block:%p, processing begins, cols:%d rows:%d", block, block->numOfCols, block->numOfRows); + for (int32_t i = 0; i < block->numOfCols; ++i) { SUdfColumn* col = block->udfCols[i]; - if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) { + if (col->colMeta.type != TSDB_DATA_TYPE_INT && col->colMeta.type != TSDB_DATA_TYPE_DOUBLE) { + udfError("block:%p, col:%d type:%d should be int(%d) or double(%d)", block, i, col->colMeta.type, + TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_DOUBLE); return TSDB_CODE_UDF_INVALID_INPUT; } } + + double sumSquares = *(double*)interBuf->buf; + int8_t numNotNull = 0; + for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t j = 0; j < block->numOfRows; ++j) { SUdfColumn* col = block->udfCols[i]; if (udfColDataIsNull(col, j)) { + udfTrace("block:%p, col:%d row:%d is null", block, i, j); continue; } + switch (col->colMeta.type) { case TSDB_DATA_TYPE_INT: { char* cell = udfColDataGetData(col, j); int32_t num = *(int32_t*)cell; sumSquares += (double)num * num; + udfTrace("block:%p, col:%d row:%d data:%d", block, i, j, num); break; } case TSDB_DATA_TYPE_DOUBLE: { char* cell = udfColDataGetData(col, j); double num = *(double*)cell; sumSquares += num * num; + udfTrace("block:%p, col:%d row:%d data:%f", block, i, j, num); break; } default: @@ -48,11 +65,14 @@ DLL_EXPORT int32_t l2norm(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInte } ++numNotNull; } + udfTrace("block:%p, col:%d result is %f", block, i, sumSquares); } *(double*)(newInterBuf->buf) = sumSquares; newInterBuf->bufLen = sizeof(double); newInterBuf->numOfResult = 1; + + udfTrace("block:%p, result is %f", block, sumSquares); return 0; } @@ -61,5 +81,7 @@ DLL_EXPORT int32_t l2norm_finish(SUdfInterBuf* buf, SUdfInterBuf* resultData) { *(double*)(resultData->buf) = sqrt(sumSquares); resultData->bufLen = sizeof(double); resultData->numOfResult = 1; + + udfTrace("end aggregation, result is %f", *(double*)(resultData->buf)); return 0; } diff --git a/tests/script/sh/max_vol.c b/tests/script/sh/max_vol.c index 0a57a26d1c..1a7a3f8210 100644 --- a/tests/script/sh/max_vol.c +++ b/tests/script/sh/max_vol.c @@ -6,27 +6,33 @@ #define STR_MAX_LEN 256 // inter buffer length -// init DLL_EXPORT int32_t max_vol_init() { return 0; } -// destory DLL_EXPORT int32_t max_vol_destroy() { return 0; } -// start DLL_EXPORT int32_t max_vol_start(SUdfInterBuf *buf) { + int32_t bufLen = sizeof(float) + STR_MAX_LEN; + if (buf->bufLen < bufLen) { + udfError("failed to execute udf since input buflen:%d < %d", buf->bufLen, bufLen); + return TSDB_CODE_UDF_INVALID_BUFSIZE; + } + + udfTrace("start aggregation, buflen:%d used:%d", buf->bufLen, bufLen); memset(buf->buf, 0, sizeof(float) + STR_MAX_LEN); - // set init value - *((float *)buf->buf) = -10000000; - buf->bufLen = sizeof(float) + STR_MAX_LEN; + *((float *)buf->buf) = INT32_MIN; + buf->bufLen = bufLen; buf->numOfResult = 0; return 0; } DLL_EXPORT int32_t max_vol(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { + udfTrace("block:%p, processing begins, cols:%d rows:%d", block, block->numOfCols, block->numOfRows); + float maxValue = *(float *)interBuf->buf; char strBuff[STR_MAX_LEN] = "inter1buf"; if (block->numOfCols < 2) { + udfError("block:%p, cols:%d needs to be greater than 2", block, block->numOfCols); return TSDB_CODE_UDF_INVALID_INPUT; } @@ -36,10 +42,12 @@ DLL_EXPORT int32_t max_vol(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInt if (i == block->numOfCols - 1) { // last column is device id , must varchar if (col->colMeta.type != TSDB_DATA_TYPE_VARCHAR) { + udfError("block:%p, col:%d type:%d should be varchar(%d)", block, i, col->colMeta.type, TSDB_DATA_TYPE_VARCHAR); return TSDB_CODE_UDF_INVALID_INPUT; } } else { if (col->colMeta.type != TSDB_DATA_TYPE_FLOAT) { + udfError("block:%p, col:%d type:%d should be float(%d)", block, i, col->colMeta.type, TSDB_DATA_TYPE_FLOAT); return TSDB_CODE_UDF_INVALID_INPUT; } } @@ -47,28 +55,41 @@ DLL_EXPORT int32_t max_vol(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInt // calc max voltage SUdfColumn *lastCol = block->udfCols[block->numOfCols - 1]; - for (int32_t i = 0; i < (block->numOfCols - 1); ++i) { + for (int32_t i = 0; i < block->numOfCols - 1; ++i) { for (int32_t j = 0; j < block->numOfRows; ++j) { SUdfColumn *col = block->udfCols[i]; if (udfColDataIsNull(col, j)) { + udfTrace("block:%p, col:%d row:%d is null", block, i, j); continue; } + char *data = udfColDataGetData(col, j); float voltage = *(float *)data; - if (voltage > maxValue) { + + if (voltage <= maxValue) { + udfTrace("block:%p, col:%d row:%d data:%f", block, i, j, voltage); + } else { maxValue = voltage; - char *valData = udfColDataGetData(lastCol, j); + char *valData = udfColDataGetData(lastCol, j); + int32_t valDataLen = udfColDataGetDataLen(lastCol, j); + // get device id - char *deviceId = valData + sizeof(uint16_t); - sprintf(strBuff, "%s_(%d,%d)_%f", deviceId, j, i, maxValue); + char *deviceId = valData + sizeof(uint16_t); + int32_t deviceIdLen = valDataLen < (STR_MAX_LEN - 1) ? valDataLen : (STR_MAX_LEN - 1); + + strncpy(strBuff, deviceId, deviceIdLen); + snprintf(strBuff + deviceIdLen, STR_MAX_LEN - deviceIdLen, "_(%d,%d)_%f", j, i, maxValue); + udfTrace("block:%p, col:%d row:%d data:%f, as max_val:%s", block, i, j, voltage, strBuff); } } } *(float *)newInterBuf->buf = maxValue; - strcpy(newInterBuf->buf + sizeof(float), strBuff); + strncpy(newInterBuf->buf + sizeof(float), strBuff, STR_MAX_LEN); newInterBuf->bufLen = sizeof(float) + strlen(strBuff) + 1; newInterBuf->numOfResult = 1; + + udfTrace("block:%p, result is %s", block, strBuff); return 0; } @@ -86,5 +107,7 @@ DLL_EXPORT int32_t max_vol_finish(SUdfInterBuf *buf, SUdfInterBuf *resultData) { resultData->bufLen = len + sizeof(uint16_t); // set row count resultData->numOfResult = 1; + + udfTrace("end aggregation, result is %s", str); return 0; }