diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 24fa2898ea..a71a2a6b7f 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -36,6 +36,7 @@ typedef struct SFuncExecEnv { } SFuncExecEnv; typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); +typedef void (*FExecCleanUp)(struct SqlFunctionCtx* pCtx); typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); @@ -54,6 +55,7 @@ typedef struct SFuncExecFuncs { FExecProcess process; FExecFinalize finalize; FExecCombine combine; + FExecCleanUp cleanup; processFuncByRow processFuncByRow; } SFuncExecFuncs; diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index f0e0f81cf5..01a67a6a03 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -326,6 +326,9 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { } if (code != TSDB_CODE_SUCCESS) { + if (pCtx[k].fpSet.cleanup != NULL) { + pCtx[k].fpSet.cleanup(&pCtx[k]); + } qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); return code; } @@ -640,6 +643,9 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC } if (code != TSDB_CODE_SUCCESS) { + if (pCtx[k].fpSet.cleanup != NULL) { + pCtx[k].fpSet.cleanup(&pCtx[k]); + } qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); taskInfo->code = code; T_LONG_JMP(taskInfo->env, code); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index a9ba57e1d4..21b7c0880f 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -1057,6 +1057,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc code = pfCtx->fpSet.process(pfCtx); if (code != TSDB_CODE_SUCCESS) { + if (pCtx[k].fpSet.cleanup != NULL) { + pCtx[k].fpSet.cleanup(&pCtx[k]); + } goto _exit; } diff --git a/source/libs/function/inc/builtins.h b/source/libs/function/inc/builtins.h index 343f5b8367..5707ee76f4 100644 --- a/source/libs/function/inc/builtins.h +++ b/source/libs/function/inc/builtins.h @@ -40,6 +40,7 @@ typedef struct SBuiltinFuncDefinition { FExecProcess processFunc; FScalarExecProcess sprocessFunc; FExecFinalize finalizeFunc; + FExecCleanUp cleanupFunc; #ifdef BUILD_NO_CALL FExecProcess invertFunc; #endif diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 7403a4ce31..77d6bda35b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -239,6 +239,7 @@ bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t modeFunction(SqlFunctionCtx* pCtx); int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +void modeFunctionCleanupExt(SqlFunctionCtx* pCtx); bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4bc86eb0c6..17ba430150 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -3658,6 +3658,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = modeFunction, .sprocessFunc = modeScalarFunction, .finalizeFunc = modeFinalize, + .cleanupFunc = modeFunctionCleanupExt }, { .name = "abs", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fa8cf243c4..84ab103456 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6019,6 +6019,12 @@ static void modeFunctionCleanup(SModeInfo * pInfo) { taosMemoryFreeClear(pInfo->buf); } +void modeFunctionCleanupExt(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + modeFunctionCleanup(pInfo); +} + static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) { if (IS_VAR_DATA_TYPE(pInfo->colType)) { (void)memcpy(pInfo->buf, data, varDataTLen(data)); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 254a06426c..2f71ab8e24 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -142,6 +142,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc; pFpSet->combine = funcMgtBuiltins[funcId].combineFunc; pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow; + pFpSet->cleanup = funcMgtBuiltins[funcId].cleanupFunc; return TSDB_CODE_SUCCESS; }