diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 0580f3acba..094ce80106 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -207,7 +207,7 @@ typedef struct SqlFunctionCtx { struct SSDataBlock *pSrcBlock; int32_t curBufPage; - char* udfName[TSDB_FUNC_NAME_LEN]; + char udfName[TSDB_FUNC_NAME_LEN]; } SqlFunctionCtx; enum { diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 126a2ccf99..0572262bfc 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -162,6 +162,7 @@ EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWin int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmGetScalarFuncExecFuncs(int32_t funcId, SScalarFuncExecFuncs* pFpSet); +int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); #ifdef __cplusplus } diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 985bf6fa6f..e49f5cac45 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -140,7 +140,7 @@ typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); -typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf); +typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf); typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf* buf, SUdfInterBuf *resultData); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f1995b723d..943a016c27 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1897,7 +1897,14 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId; if (fmIsAggFunc(pCtx->functionId) || fmIsNonstandardSQLFunc(pCtx->functionId)) { - fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); + bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId); + if (!isUdaf) { + fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); + } else { + char *udfName = pExpr->pExpr->_function.pFunctNode->functionName; + strncpy(pCtx->udfName, udfName, strlen(udfName)); + fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet); + } pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); } else { fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 585eb57a56..0113da94eb 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -124,7 +124,10 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { return TSDB_CODE_SUCCESS; } -int32_t fmGetUdafExecFuncs(SFuncExecFuncs* pFpSet) { +int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { + if (!fmIsUserDefinedFunc(funcId)) { + return TSDB_CODE_FAILED; + } pFpSet->getEnv = udfAggGetEnv; pFpSet->init = udfAggInit; pFpSet->process = udfAggProcess; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 896ebd3763..ae24a832c8 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -232,7 +232,7 @@ void udfdProcessRequest(uv_work_t *req) { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen= udf->bufSize, .numOfResult = 0}; - udf->aggProcFunc(&input, &outBuf); + udf->aggProcFunc(&input, &call->interBuf, &outBuf); subRsp->resultBuf = outBuf; break; diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index 250c20ba88..83187c5855 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -24,7 +24,7 @@ int32_t udf2_start(SUdfInterBuf *buf) { return 0; } -int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) { +int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { int64_t sumSquares = *(int64_t*)interBuf->buf; for (int32_t i = 0; i < block->numOfCols; ++i) { for (int32_t j = 0; j < block->numOfRows; ++i) { @@ -35,10 +35,10 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf) { } } - *(int64_t*)interBuf = sumSquares; - interBuf->bufLen = sizeof(int64_t); + *(int64_t*)newInterBuf = sumSquares; + newInterBuf->bufLen = sizeof(int64_t); //TODO: if all null value, numOfResult = 0; - interBuf->numOfResult = 1; + newInterBuf->numOfResult = 1; return 0; }