diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index 683274f3ae..db62accb85 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -99,19 +99,20 @@ typedef struct SUdfInterBuf { } SUdfInterBuf; //TODO: translate these calls to callUdf -int32_t callUdfAggInit(SUdfInterBuf *interBuf); -// input: block, initFirst -// output: interbuf -int32_t callUdfAggProcess(SSDataBlock *block, SUdfInterBuf *interBuf); +// output: interBuf +int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf); +// input: block, state +// output: newState +int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); // input: interBuf // output: resultData -int32_t callUdfAggFinalize(SUdfInterBuf *interBuf, SSDataBlock *resultData); +int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); // input: interbuf1, interbuf2 // output: resultBuf -int32_t callUdfAggMerge(SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); +int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); // input: block // output: resultData -int32_t callUdfScalaProcess(SSDataBlock *block, SSDataBlock *resultData); +int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData); /** * tearn down udf @@ -134,12 +135,12 @@ typedef int32_t (*TUdfTeardownFunc)(); //typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data); //typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data); -typedef int32_t (*TUdfFreeUdfColumnDataFunc)(SUdfColumnData* columnData); +typedef int32_t (*TUdfFreeUdfColumnDataFunc)(SUdfColumn* columnData); -typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumnData *resultData); +typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock block, SUdfInterBuf *interBuf); -typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfColumnData *resultData); +typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf buf, SUdfInterBuf *resultData); // end API to UDF writer diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 6c3da8cd89..496f486e05 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -59,7 +59,7 @@ typedef struct SUdfCallRequest { typedef struct SUdfCallResponse { int8_t callType; SSDataBlock resultData; - SUdfInterBuf interBuf; + SUdfInterBuf resultBuf; } SUdfCallResponse; @@ -106,6 +106,9 @@ void freeUdfColumnData(SUdfColumnData *data); void freeUdfColumn(SUdfColumn* col); void freeUdfDataDataBlock(SUdfDataBlock *block); +int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); +int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index c8601c69c8..1bc961d527 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -18,6 +18,7 @@ #include "tudf.h" #include "tudfInt.h" #include "tarray.h" +#include "tdatablock.h" //TODO: when startup, set thread poll size. add it to cfg //TODO: test for udfd restart @@ -351,13 +352,13 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) { len += tEncodeDataBlock(buf, &callRsp->resultData); break; case TSDB_UDF_CALL_AGG_INIT: - len += encodeUdfInterBuf(buf, &callRsp->interBuf); + len += encodeUdfInterBuf(buf, &callRsp->resultBuf); break; case TSDB_UDF_CALL_AGG_PROC: - len += encodeUdfInterBuf(buf, &callRsp->interBuf); + len += encodeUdfInterBuf(buf, &callRsp->resultBuf); break; case TSDB_UDF_CALL_AGG_MERGE: - len += encodeUdfInterBuf(buf, &callRsp->interBuf); + len += encodeUdfInterBuf(buf, &callRsp->resultBuf); break; case TSDB_UDF_CALL_AGG_FIN: len += tEncodeDataBlock(buf, &callRsp->resultData); @@ -373,13 +374,13 @@ void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) { buf = tDecodeDataBlock(buf, &callRsp->resultData); break; case TSDB_UDF_CALL_AGG_INIT: - buf = decodeUdfInterBuf(buf, &callRsp->interBuf); + buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; case TSDB_UDF_CALL_AGG_PROC: - buf = decodeUdfInterBuf(buf, &callRsp->interBuf); + buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; case TSDB_UDF_CALL_AGG_MERGE: - buf = decodeUdfInterBuf(buf, &callRsp->interBuf); + buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; case TSDB_UDF_CALL_AGG_FIN: buf = tDecodeDataBlock(buf, &callRsp->resultData); @@ -483,6 +484,70 @@ void freeUdfInterBuf(SUdfInterBuf *buf) { buf->buf = NULL; } + +int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) { + udfBlock->numOfRows = block->info.rows; + udfBlock->numOfCols = block->info.numOfCols; + udfBlock->udfCols = taosMemoryMalloc(sizeof(SUdfColumn*) * udfBlock->numOfCols); + for (int32_t i = 0; i < udfBlock->numOfCols; ++i) { + udfBlock->udfCols[i] = taosMemoryMalloc(sizeof(SUdfColumn)); + SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i); + SUdfColumn *udfCol = udfBlock->udfCols[i]; + udfCol->colMeta.type = col->info.type; + udfCol->colMeta.bytes = col->info.bytes; + udfCol->colMeta.scale = col->info.scale; + udfCol->colMeta.precision = col->info.precision; + udfCol->colData.numOfRows = udfBlock->numOfRows; + udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type); + if (udfCol->colData.varLengthColumn) { + udfCol->colData.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; + udfCol->colData.varOffsets = taosMemoryMalloc(udfCol->colData.varOffsetsLen); + memcpy(udfCol->colData.varOffsets, col->varmeta.offset, udfCol->colData.varOffsetsLen); + udfCol->colData.payloadLen = colDataGetLength(col, udfBlock->numOfRows); + udfCol->colData.payload = taosMemoryMalloc(udfCol->colData.payloadLen); + memcpy(udfCol->colData.payload, col->pData, udfCol->colData.payloadLen); + } else { + udfCol->colData.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows); + udfCol->colData.nullBitmap = taosMemoryMalloc(udfCol->colData.nullBitmapLen); + memcpy(udfCol->colData.nullBitmap, col->nullbitmap, udfCol->colData.nullBitmapLen); + udfCol->colData.dataLen = colDataGetLength(col, udfBlock->numOfRows); + udfCol->colData.data = taosMemoryMalloc(udfCol->colData.dataLen); + memcpy(udfCol->colData.data, col->pData, udfCol->colData.dataLen); + } + } + return 0; +} + +int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { + block->info.numOfCols = 1; + block->info.rows = udfCol->colData.numOfRows; + block->info.hasVarCol = udfCol->colData.varLengthColumn; + + block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + taosArraySetSize(block->pDataBlock, 1); + SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0); + SUdfColumnMeta *meta = &udfCol->colMeta; + col->info.precision = meta->precision; + col->info.bytes = meta->bytes; + col->info.scale = meta->scale; + col->info.type = meta->type; + SUdfColumnData *data = &udfCol->colData; + + if (!IS_VAR_DATA_TYPE(meta->type)) { + col->nullbitmap = taosMemoryMalloc(data->nullBitmapLen); + memcpy(col->nullbitmap, data->nullBitmap, data->nullBitmapLen); + col->pData = taosMemoryMalloc(data->dataLen); + memcpy(col->pData, data->payload, data->dataLen); + } else { + col->varmeta.offset = taosMemoryMalloc(data->varOffsetsLen); + memcpy(col->varmeta.offset, data->varOffsets, data->varOffsetsLen); + col->pData = taosMemoryMalloc(data->payloadLen); + memcpy(col->pData, data->payload, data->payloadLen); + } + return 0; +} + + void onUdfcPipeClose(uv_handle_t *handle) { SClientUvConn *conn = handle->data; if (!QUEUE_EMPTY(&conn->taskQueue)) { @@ -962,7 +1027,7 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { return err; } -int32_t callUdf(UdfHandle handle, int8_t callType, int8_t initFirst, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, +int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, SSDataBlock* output, SUdfInterBuf *newState) { debugPrint("%s", "client call udf"); @@ -1002,19 +1067,19 @@ int32_t callUdf(UdfHandle handle, int8_t callType, int8_t initFirst, SSDataBlock SUdfCallResponse *rsp = &task->_call.rsp; switch (callType) { case TSDB_UDF_CALL_AGG_INIT: { - *newState = rsp->interBuf; + *newState = rsp->resultBuf; break; } case TSDB_UDF_CALL_AGG_PROC: { - *newState = rsp->interBuf; + *newState = rsp->resultBuf; break; } case TSDB_UDF_CALL_AGG_MERGE: { - *newState = rsp->interBuf; + *newState = rsp->resultBuf; break; } case TSDB_UDF_CALL_AGG_FIN: { - *output = rsp->resultData; + *newState = rsp->resultBuf; break; } case TSDB_UDF_CALL_SCALA_PROC: { @@ -1027,6 +1092,47 @@ int32_t callUdf(UdfHandle handle, int8_t callType, int8_t initFirst, SSDataBlock return task->errCode; } +//TODO: translate these calls to callUdf +int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) { + int8_t callType = TSDB_UDF_CALL_AGG_INIT; + + int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf); + + return err; +} + +// input: block, state +// output: interbuf, +int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { + int8_t callType = TSDB_UDF_CALL_AGG_PROC; + int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState); + return err; +} + +// input: interbuf1, interbuf2 +// output: resultBuf +int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { + int8_t callType = TSDB_UDF_CALL_AGG_MERGE; + int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); + return err; +} + +// input: interBuf +// output: resultData +int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { + int8_t callType = TSDB_UDF_CALL_AGG_PROC; + int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); + return err; +} + +// input: block +// output: resultData +int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData) { + int8_t callType = TSDB_UDF_CALL_SCALA_PROC; + int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL); + return err; +} + int32_t teardownUdf(UdfHandle handle) { debugPrint("%s", "client teardown udf"); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 76c731a473..d5c917121a 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -112,8 +112,8 @@ void udfdProcessRequest(uv_work_t *req) { SUdf *udf = handle->udf; SUdfDataBlock input = {0}; - //TODO: convertSDataBlockToUdfDataBlock(call->block, &input); - SUdfColumnData output; + convertDataBlockToUdfDataBlock(&call->block, &input); + SUdfColumn output; //TODO: call different functions according to call type, for now just calar if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { udf->scalarProcFunc(input, &output); @@ -126,7 +126,7 @@ void udfdProcessRequest(uv_work_t *req) { rsp->type = request.type; rsp->code = 0; SUdfCallResponse *subRsp = &rsp->callRsp; - //TODO: convertSUdfColumnDataToSSDataBlock(output, &subRsp->resultData); + convertUdfColumnToDataBlock(&output, &subRsp->resultData); } int32_t len = encodeUdfResponse(NULL, rsp);