From 0119054b0bc82399ce8f3539c22dc9a187b6aea1 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 28 Apr 2022 11:12:45 +0800 Subject: [PATCH] aggregate function call from udfd --- include/libs/function/tudf.h | 4 +- source/libs/function/src/udfd.c | 106 +++++++++++++++++++++++--------- 2 files changed, 80 insertions(+), 30 deletions(-) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 096cc3da09..985bf6fa6f 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -139,9 +139,9 @@ typedef int32_t (*TUdfTeardownFunc)(); typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); -typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf); +typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf); -typedef int32_t (*TUdfAggFinalizeFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); +typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); // end API to UDF writer diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 9398b44adf..896ebd3763 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -77,6 +77,10 @@ typedef struct SUdf { uv_lib_t lib; TUdfScalarProcFunc scalarProcFunc; TUdfFreeUdfColumnFunc freeUdfColumn; + + TUdfAggStartFunc aggStartFunc; + TUdfAggProcessFunc aggProcFunc; + TUdfAggFinishFunc aggFinishFunc; } SUdf; // TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix @@ -97,15 +101,32 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); return UDFC_CODE_LOAD_UDF_FAILURE; } - // TODO: find all the functions - char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(normalFuncName, udfName); - uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc)); - char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *freeSuffix = "_free"; - strncpy(freeFuncName, normalFuncName, strlen(normalFuncName)); - strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); - uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); + //TODO: init and destroy function + if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { + char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(processFuncName, udfName); + uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc)); + char freeFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; + char *freeSuffix = "_free"; + strncpy(freeFuncName, processFuncName, strlen(processFuncName)); + strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); + uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); + } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { + char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(processFuncName, udfName); + uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc)); + char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *startSuffix = "_start"; + strncpy(startFuncName, processFuncName, strlen(processFuncName)); + strncat(startFuncName, startSuffix, strlen(startSuffix)); + uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc)); + char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; + char *finishSuffix = "_finish"; + strncpy(finishFuncName, processFuncName, strlen(processFuncName)); + strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); + uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggFinishFunc)); + //TODO: merge + } return 0; } @@ -181,26 +202,58 @@ void udfdProcessRequest(uv_work_t *req) { call->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); SUdf *udf = handle->udf; - - SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - SUdfColumn output = {0}; - // TODO: call different functions according to call type, for now just calar - if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { - udf->scalarProcFunc(&input, &output); - } - SUdfResponse response = {0}; SUdfResponse *rsp = &response; - if (call->callType == TSDB_UDF_CALL_SCALA_PROC) { - rsp->seqNum = request.seqNum; - rsp->type = request.type; - rsp->code = 0; - SUdfCallResponse *subRsp = &rsp->callRsp; - subRsp->callType = call->callType; - convertUdfColumnToDataBlock(&output, &subRsp->resultData); + SUdfCallResponse *subRsp = &rsp->callRsp; + + switch(call->callType) { + case TSDB_UDF_CALL_SCALA_PROC: { + SUdfColumn output = {0}; + + SUdfDataBlock input = {0}; + convertDataBlockToUdfDataBlock(&call->block, &input); + udf->scalarProcFunc(&input, &output); + + convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); + udf->freeUdfColumn(&output); + break; + } + case TSDB_UDF_CALL_AGG_INIT: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), + .bufLen= udf->bufSize, + .numOfResult = 0}; + udf->aggStartFunc(&outBuf); + subRsp->resultBuf = outBuf; + break; + } + case TSDB_UDF_CALL_AGG_PROC: { + SUdfDataBlock input = {0}; + convertDataBlockToUdfDataBlock(&call->block, &input); + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), + .bufLen= udf->bufSize, + .numOfResult = 0}; + udf->aggProcFunc(&input, &outBuf); + subRsp->resultBuf = outBuf; + + break; + } + case TSDB_UDF_CALL_AGG_FIN: { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), + .bufLen= udf->bufSize, + .numOfResult = 0}; + udf->aggFinishFunc(&call->interBuf, &outBuf); + subRsp->resultBuf = outBuf; + break; + } + default: + break; } + rsp->seqNum = request.seqNum; + rsp->type = request.type; + rsp->code = 0; + subRsp->callType = call->callType; + int32_t len = encodeUdfResponse(NULL, rsp); rsp->msgLen = len; void *bufBegin = taosMemoryMalloc(len); @@ -208,9 +261,6 @@ void udfdProcessRequest(uv_work_t *req) { encodeUdfResponse(&buf, rsp); uvUdf->output = uv_buf_init(bufBegin, len); - // TODO: free udf column - udf->freeUdfColumn(&output); - taosMemoryFree(uvUdf->input.base); break; }