From f183c5606c1702a091b0aff6013ae274f3dd7b52 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 27 Apr 2022 07:27:58 +0800 Subject: [PATCH] before modify udfd process request and runudf.c --- include/libs/function/tudf.h | 6 +-- source/libs/function/CMakeLists.txt | 2 +- source/libs/function/src/udfd.c | 2 +- source/libs/function/test/udf1.c | 6 +-- source/libs/function/test/udf2.c | 73 +++++++++-------------------- 5 files changed, 30 insertions(+), 59 deletions(-) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 37fd3c8e3c..096cc3da09 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -137,11 +137,11 @@ typedef int32_t (*TUdfTeardownFunc)(); //typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data); typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); -typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol); +typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf); -typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock block, SUdfInterBuf *interBuf); -typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfInterBuf *resultData); +typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf); +typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); // end API to UDF writer diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index 5a33631234..15813b3cb0 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -53,7 +53,7 @@ target_link_libraries( add_library(udf2 MODULE test/udf2.c) target_include_directories( - udf1 + udf2 PUBLIC "${TD_SOURCE_DIR}/include/libs/function" "${TD_SOURCE_DIR}/include/util" diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 6c68adda4e..9398b44adf 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -187,7 +187,7 @@ void udfdProcessRequest(uv_work_t *req) { SUdfColumn output = {0}; // TODO: call different functions according to call type, for now just calar if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { - udf->scalarProcFunc(input, &output); + udf->scalarProcFunc(&input, &output); } SUdfResponse response = {0}; diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index 6a2237470d..b2367313ae 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -17,10 +17,10 @@ int32_t udf1_destroy() { return 0; } -int32_t udf1(SUdfDataBlock block, SUdfColumn *resultCol) { +int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { SUdfColumnData *resultData = &resultCol->colData; - resultData->numOfRows = block.numOfRows; - SUdfColumnData *srcData = &block.udfCols[0]->colData; + resultData->numOfRows = block->numOfRows; + SUdfColumnData *srcData = &block->udfCols[0]->colData; resultData->varLengthColumn = srcData->varLengthColumn; if (resultData->varLengthColumn) { diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index 3169f46263..250c20ba88 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -18,64 +18,35 @@ int32_t udf2_destroy() { } int32_t udf2_start(SUdfInterBuf *buf) { - + *(int64_t*)(buf->buf) = 0; + buf->bufLen = sizeof(int64_t); + buf->numOfResult = 0; + return 0; } -int32_t udf2(SUdfDataBlock block, SUdfInterBuf *interBuf) { - -} - -int32_t udf2_finish(SUdfInterBuf buf, SUdfInterBuf *resultData) { - -} - -int32_t udf2(SUdfDataBlock block, SUdfColumn *resultCol) { - SUdfColumnData *resultData = &resultCol->colData; - resultData->numOfRows = block.numOfRows; - SUdfColumnData *srcData = &block.udfCols[0]->colData; - resultData->varLengthColumn = srcData->varLengthColumn; - - if (resultData->varLengthColumn) { - resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen; - resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen); - memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen); - - resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen; - resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen); - memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen); - } else { - resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen; - resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen); - memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen); - - resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen; - resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen); - memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen); - for (int32_t i = 0; i < resultData->numOfRows; ++i) { - *(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88; +int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) { + int64_t sumSquares = *(int64_t*)interBuf->buf; + for (int32_t i = 0; i < block->numOfCols; ++i) { + for (int32_t j = 0; j < block->numOfRows; ++i) { + SUdfColumn* col = block->udfCols[i]; + //TODO: check the bitmap for null value + int32_t* rows = (int32_t*)col->colData.fixLenCol.data; + sumSquares += rows[j] * rows[j]; } } - SUdfColumnMeta *meta = &resultCol->colMeta; - meta->bytes = 4; - meta->type = TSDB_DATA_TYPE_INT; - meta->scale = 0; - meta->precision = 0; + *(int64_t*)interBuf = sumSquares; + interBuf->bufLen = sizeof(int64_t); + //TODO: if all null value, numOfResult = 0; + interBuf->numOfResult = 1; return 0; } -int32_t udf2_free(SUdfColumn *col) { - SUdfColumnData *data = &col->colData; - if (data->varLengthColumn) { - free(data->varLenCol.varOffsets); - data->varLenCol.varOffsets = NULL; - free(data->varLenCol.payload); - data->varLenCol.payload = NULL; - } else { - free(data->fixLenCol.nullBitmap); - data->fixLenCol.nullBitmap = NULL; - free(data->fixLenCol.data); - data->fixLenCol.data = NULL; - } +int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) { + //TODO: check numOfResults; + int64_t sumSquares = *(int64_t*)(buf->buf); + *(double*)(resultData->buf) = sqrt(sumSquares); + resultData->bufLen = sizeof(double); + resultData->numOfResult = 1; return 0; }