From d668ee7b91e5353b7a3d285245bbc5ae6e8f2ad1 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 17 May 2022 12:03:37 +0800 Subject: [PATCH] fix: aggregate memory leaking --- include/libs/function/tudf.h | 4 ++ source/libs/function/src/tudf.c | 7 ++- source/libs/function/test/runUdf.c | 75 +++++++++++++++++++++++++----- 3 files changed, 73 insertions(+), 13 deletions(-) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 6a98138c6c..28b1fbe8ce 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -87,6 +87,7 @@ typedef struct SUdfInterBuf { } SUdfInterBuf; typedef void *UdfcFuncHandle; +//low level APIs /** * setup udf * @param udf, in @@ -115,6 +116,9 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t */ int32_t doTeardownUdf(UdfcFuncHandle handle); +void freeUdfInterBuf(SUdfInterBuf *buf); + +//high level APIs bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index bf2fbce79e..03a3891a4c 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -836,7 +836,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode * fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask); if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->rspBuf.base != NULL) { - SUdfResponse rsp; + SUdfResponse rsp = {0}; void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp); assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base)); task->errCode = rsp.code; @@ -1569,6 +1569,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult } udfRes->interResNum = buf.numOfResult; memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); + freeUdfInterBuf(&buf); return true; } @@ -1626,7 +1627,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { blockDataDestroy(inputBlock); taosArrayDestroy(tempBlock.pDataBlock); - taosMemoryFree(newState.buf); + freeUdfInterBuf(&newState); return udfCode; } @@ -1655,6 +1656,8 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } + freeUdfInterBuf(&resultBuf); + int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); releaseUdfFuncHandle(pCtx->udfName); return udfCallCode == 0 ? numOfResults : udfCallCode; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 4490c2d5e3..9fe9269a3f 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -34,17 +34,7 @@ static int32_t initLog() { return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, NULL, 0); } -int main(int argc, char *argv[]) { - parseArgs(argc, argv); - initLog(); - if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { - fnError("failed to start since read config error"); - return -1; - } - - udfcOpen(); - uv_sleep(1000); - +int scalarFuncTest() { UdfcFuncHandle handle; if (doSetupUdf("udf1", &handle) != 0) { @@ -87,5 +77,68 @@ int main(int argc, char *argv[]) { taosMemoryFree(output.columnData); doTeardownUdf(handle); + + return 0; +} + +int aggregateFuncTest() { + UdfcFuncHandle handle; + + if (doSetupUdf("udf2", &handle) != 0) { + fnError("setup udf failure"); + return -1; + } + + SSDataBlock block = {0}; + SSDataBlock *pBlock = █ + pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); + pBlock->info.numOfCols = 1; + pBlock->info.rows = 4; + char data[16] = {0}; + char bitmap[4] = {0}; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData colInfo = {0}; + colInfo.info.type = TSDB_DATA_TYPE_INT; + colInfo.info.bytes = sizeof(int32_t); + colInfo.info.colId = 1; + colInfo.pData = data; + colInfo.nullbitmap = bitmap; + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + colDataAppendInt32(&colInfo, j, &j); + } + taosArrayPush(pBlock->pDataBlock, &colInfo); + } + + SUdfInterBuf buf = {0}; + SUdfInterBuf newBuf = {0}; + SUdfInterBuf resultBuf = {0}; + doCallUdfAggInit(handle, &buf); + doCallUdfAggProcess(handle, pBlock, &buf, &newBuf); + taosArrayDestroy(pBlock->pDataBlock); + + doCallUdfAggFinalize(handle, &newBuf, &resultBuf); + fprintf(stderr, "agg result: %f\n", *(double*)resultBuf.buf); + + freeUdfInterBuf(&buf); + freeUdfInterBuf(&newBuf); + freeUdfInterBuf(&resultBuf); + doTeardownUdf(handle); + + return 0; +} + +int main(int argc, char *argv[]) { + parseArgs(argc, argv); + initLog(); + if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { + fnError("failed to start since read config error"); + return -1; + } + + udfcOpen(); + uv_sleep(1000); + + scalarFuncTest(); + aggregateFuncTest(); udfcClose(); }