fix:[TD-31818] fix memory leak allocated by mode function.
This commit is contained in:
parent
0104ddf02d
commit
02dd9c9160
|
@ -36,6 +36,7 @@ typedef struct SFuncExecEnv {
|
||||||
} SFuncExecEnv;
|
} SFuncExecEnv;
|
||||||
|
|
||||||
typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
|
typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
|
||||||
|
typedef void (*FExecCleanUp)(struct SqlFunctionCtx* pCtx);
|
||||||
typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
|
typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
|
||||||
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
||||||
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
|
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
|
||||||
|
@ -54,6 +55,7 @@ typedef struct SFuncExecFuncs {
|
||||||
FExecProcess process;
|
FExecProcess process;
|
||||||
FExecFinalize finalize;
|
FExecFinalize finalize;
|
||||||
FExecCombine combine;
|
FExecCombine combine;
|
||||||
|
FExecCleanUp cleanup;
|
||||||
processFuncByRow processFuncByRow;
|
processFuncByRow processFuncByRow;
|
||||||
} SFuncExecFuncs;
|
} SFuncExecFuncs;
|
||||||
|
|
||||||
|
|
|
@ -326,6 +326,9 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (pCtx[k].fpSet.cleanup != NULL) {
|
||||||
|
pCtx[k].fpSet.cleanup(&pCtx[k]);
|
||||||
|
}
|
||||||
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -640,6 +643,9 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (pCtx[k].fpSet.cleanup != NULL) {
|
||||||
|
pCtx[k].fpSet.cleanup(&pCtx[k]);
|
||||||
|
}
|
||||||
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
|
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
|
||||||
taskInfo->code = code;
|
taskInfo->code = code;
|
||||||
T_LONG_JMP(taskInfo->env, code);
|
T_LONG_JMP(taskInfo->env, code);
|
||||||
|
|
|
@ -1057,6 +1057,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
|
|
||||||
code = pfCtx->fpSet.process(pfCtx);
|
code = pfCtx->fpSet.process(pfCtx);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (pCtx[k].fpSet.cleanup != NULL) {
|
||||||
|
pCtx[k].fpSet.cleanup(&pCtx[k]);
|
||||||
|
}
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ typedef struct SBuiltinFuncDefinition {
|
||||||
FExecProcess processFunc;
|
FExecProcess processFunc;
|
||||||
FScalarExecProcess sprocessFunc;
|
FScalarExecProcess sprocessFunc;
|
||||||
FExecFinalize finalizeFunc;
|
FExecFinalize finalizeFunc;
|
||||||
|
FExecCleanUp cleanupFunc;
|
||||||
#ifdef BUILD_NO_CALL
|
#ifdef BUILD_NO_CALL
|
||||||
FExecProcess invertFunc;
|
FExecProcess invertFunc;
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -239,6 +239,7 @@ bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t modeFunction(SqlFunctionCtx* pCtx);
|
int32_t modeFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
void modeFunctionCleanupExt(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
|
|
@ -3658,6 +3658,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = modeFunction,
|
.processFunc = modeFunction,
|
||||||
.sprocessFunc = modeScalarFunction,
|
.sprocessFunc = modeScalarFunction,
|
||||||
.finalizeFunc = modeFinalize,
|
.finalizeFunc = modeFinalize,
|
||||||
|
.cleanupFunc = modeFunctionCleanupExt
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "abs",
|
.name = "abs",
|
||||||
|
|
|
@ -6019,6 +6019,12 @@ static void modeFunctionCleanup(SModeInfo * pInfo) {
|
||||||
taosMemoryFreeClear(pInfo->buf);
|
taosMemoryFreeClear(pInfo->buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void modeFunctionCleanupExt(SqlFunctionCtx* pCtx) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
modeFunctionCleanup(pInfo);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) {
|
static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) {
|
||||||
if (IS_VAR_DATA_TYPE(pInfo->colType)) {
|
if (IS_VAR_DATA_TYPE(pInfo->colType)) {
|
||||||
(void)memcpy(pInfo->buf, data, varDataTLen(data));
|
(void)memcpy(pInfo->buf, data, varDataTLen(data));
|
||||||
|
|
|
@ -142,6 +142,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
||||||
pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
|
pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
|
||||||
pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
|
pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
|
||||||
pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow;
|
pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow;
|
||||||
|
pFpSet->cleanup = funcMgtBuiltins[funcId].cleanupFunc;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue