diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 84860ca5a1..75b6aeaae9 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1259,7 +1259,9 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult return false; } UdfcFuncHandle handle; - if (setupUdf((char*)pCtx->udfName, &handle) != 0) { + int32_t udfCode = 0; + if ((udfCode = setupUdf((char*)pCtx->udfName, &handle)) != 0) { + fnError("udfAggInit error. step setupUdf. udf code: %d", udfCode); return false; } SClientUdfUvSession *session = (SClientUdfUvSession *)handle; @@ -1272,7 +1274,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult udfRes->session = (SClientUdfUvSession *)handle; SUdfInterBuf buf = {0}; - if (callUdfAggInit(handle, &buf) != 0) { + if ((udfCode = callUdfAggInit(handle, &buf)) != 0) { + fnError("udfAggInit error. step callUdfAggInit. udf code: %d", udfCode); return false; } udfRes->interResNum = buf.numOfResult; @@ -1316,21 +1319,23 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; - callUdfAggProcess(session, inputBlock, &state, &newState); - - udfRes->interResNum = newState.numOfResult; - memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); - + int32_t udfCode = callUdfAggProcess(session, inputBlock, &state, &newState); + if (udfCode != 0) { + fnError("udfAggProcess error. code: %d", udfCode); + newState.numOfResult = 0; + } else { + udfRes->interResNum = newState.numOfResult; + memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); + } if (newState.numOfResult == 1 || state.numOfResult == 1) { GET_RES_INFO(pCtx)->numOfRes = 1; } blockDataDestroy(inputBlock); - taosArrayDestroy(tempBlock.pDataBlock); taosMemoryFree(newState.buf); - return 0; + return TSDB_CODE_SUCCESS; } int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { @@ -1344,15 +1349,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; - callUdfAggFinalize(session, &state, &resultBuf); - - udfRes->finalResBuf = resultBuf.buf; - udfRes->finalResNum = resultBuf.numOfResult; - - teardownUdf(session); - - if (resultBuf.numOfResult == 1) { - GET_RES_INFO(pCtx)->numOfRes = 1; + int32_t udfCallCode= 0; + udfCallCode= callUdfAggFinalize(session, &state, &resultBuf); + if (udfCallCode!= 0) { + fnError("udfAggFinalize error. callUdfAggFinalize step. udf code:%d", udfCallCode); + GET_RES_INFO(pCtx)->numOfRes = 0; + } else { + memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen); + udfRes->finalResNum = resultBuf.numOfResult; + GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } + + int32_t code = teardownUdf(session); + if (code != 0) { + fnError("udfAggFinalize error. teardownUdf step. udf code: %d", code); + } + return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); + } \ No newline at end of file diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index f5fab814ff..5231890821 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -347,10 +347,21 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp if (fmIsUserDefinedFunc(node->funcId)) { UdfcFuncHandle udfHandle = NULL; - SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle)); + code = setupUdf(node->functionName, &udfHandle); + if (code != 0) { + sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code); + goto _return; + } code = callUdfScalarFunc(udfHandle, params, paramNum, output); - teardownUdf(udfHandle); - SCL_ERR_JRET(code); + if (code != 0) { + sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); + goto _return; + } + code = teardownUdf(udfHandle); + if (code != 0) { + sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); + goto _return; + } } else { SScalarFuncExecFuncs ffpSet = {0}; code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet);