before modify udfd process request and runudf.c

This commit is contained in:
shenglian zhou 2022-04-27 07:27:58 +08:00
parent e9e6b1fa1f
commit f183c5606c
5 changed files with 30 additions and 59 deletions

View File

@ -137,11 +137,11 @@ typedef int32_t (*TUdfTeardownFunc)();
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data); //typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf);
typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock block, SUdfInterBuf *interBuf); typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf);
typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfInterBuf *resultData); typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData);
// end API to UDF writer // end API to UDF writer

View File

@ -53,7 +53,7 @@ target_link_libraries(
add_library(udf2 MODULE test/udf2.c) add_library(udf2 MODULE test/udf2.c)
target_include_directories( target_include_directories(
udf1 udf2
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/function" "${TD_SOURCE_DIR}/include/libs/function"
"${TD_SOURCE_DIR}/include/util" "${TD_SOURCE_DIR}/include/util"

View File

@ -187,7 +187,7 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfColumn output = {0}; SUdfColumn output = {0};
// TODO: call different functions according to call type, for now just calar // TODO: call different functions according to call type, for now just calar
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
udf->scalarProcFunc(input, &output); udf->scalarProcFunc(&input, &output);
} }
SUdfResponse response = {0}; SUdfResponse response = {0};

View File

@ -17,10 +17,10 @@ int32_t udf1_destroy() {
return 0; return 0;
} }
int32_t udf1(SUdfDataBlock block, SUdfColumn *resultCol) { int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData; SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block.numOfRows; resultData->numOfRows = block->numOfRows;
SUdfColumnData *srcData = &block.udfCols[0]->colData; SUdfColumnData *srcData = &block->udfCols[0]->colData;
resultData->varLengthColumn = srcData->varLengthColumn; resultData->varLengthColumn = srcData->varLengthColumn;
if (resultData->varLengthColumn) { if (resultData->varLengthColumn) {

View File

@ -18,64 +18,35 @@ int32_t udf2_destroy() {
} }
int32_t udf2_start(SUdfInterBuf *buf) { int32_t udf2_start(SUdfInterBuf *buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(int64_t);
buf->numOfResult = 0;
return 0;
} }
int32_t udf2(SUdfDataBlock block, SUdfInterBuf *interBuf) { int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) {
int64_t sumSquares = *(int64_t*)interBuf->buf;
} for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++i) {
int32_t udf2_finish(SUdfInterBuf buf, SUdfInterBuf *resultData) { SUdfColumn* col = block->udfCols[i];
//TODO: check the bitmap for null value
} int32_t* rows = (int32_t*)col->colData.fixLenCol.data;
sumSquares += rows[j] * rows[j];
int32_t udf2(SUdfDataBlock block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block.numOfRows;
SUdfColumnData *srcData = &block.udfCols[0]->colData;
resultData->varLengthColumn = srcData->varLengthColumn;
if (resultData->varLengthColumn) {
resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen;
resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen);
memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen);
resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen;
resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen);
memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen);
} else {
resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen;
resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen);
memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen);
resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen;
resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen);
memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen);
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
*(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88;
} }
} }
SUdfColumnMeta *meta = &resultCol->colMeta; *(int64_t*)interBuf = sumSquares;
meta->bytes = 4; interBuf->bufLen = sizeof(int64_t);
meta->type = TSDB_DATA_TYPE_INT; //TODO: if all null value, numOfResult = 0;
meta->scale = 0; interBuf->numOfResult = 1;
meta->precision = 0;
return 0; return 0;
} }
int32_t udf2_free(SUdfColumn *col) { int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
SUdfColumnData *data = &col->colData; //TODO: check numOfResults;
if (data->varLengthColumn) { int64_t sumSquares = *(int64_t*)(buf->buf);
free(data->varLenCol.varOffsets); *(double*)(resultData->buf) = sqrt(sumSquares);
data->varLenCol.varOffsets = NULL; resultData->bufLen = sizeof(double);
free(data->varLenCol.payload); resultData->numOfResult = 1;
data->varLenCol.payload = NULL;
} else {
free(data->fixLenCol.nullBitmap);
data->fixLenCol.nullBitmap = NULL;
free(data->fixLenCol.data);
data->fixLenCol.data = NULL;
}
return 0; return 0;
} }