diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 0414841ba4..4841e05267 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -25,8 +25,6 @@ #include "functionMgt.h" //TODO: add unit test -//TODO: include all global variable under context struct - typedef struct SUdfdData { bool startCalled; bool needCleanUp; @@ -1275,7 +1273,6 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { } int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { - fnInfo("udfc setup udf. udfName: %s", udfName); if (gUdfdProxy.udfcState != UDFC_STATE_READY) { return TSDB_CODE_UDF_INVALID_STATE; } @@ -1305,7 +1302,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { if (task->errCode != 0) { fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) } else { - fnInfo("sucessfully setup udf func handle. handle: %p", task->session); + fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session); *funcHandle = task->session; } int32_t err = task->errCode; @@ -1490,13 +1487,11 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, return code; } -//TODO: when to teardown udf. teardown udf is not called int32_t doTeardownUdf(UdfcFuncHandle handle) { - fnInfo("tear down udf. udf func handle: %p", handle); - SUdfcUvSession *session = (SUdfcUvSession *) handle; + if (session->udfUvPipe == NULL) { - fnError("pipe to udfd does not exist"); + fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName); return TSDB_CODE_UDF_PIPE_NO_PIPE; } @@ -1511,7 +1506,6 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); SUdfTeardownResponse *rsp = &task->_teardown.rsp; - int32_t err = task->errCode; udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); @@ -1519,6 +1513,8 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { taosMemoryFree(task->session); taosMemoryFree(task); + fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); + return err; } @@ -1651,26 +1647,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } -// int32_t code = doTeardownUdf(session); -// if (code != 0) { -// fnError("udfAggFinalize error. doTeardownUdf step. udf code: %d", code); -// } - int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); releaseUdfFuncHandle(pCtx->udfName); return udfCallCode == 0 ? numOfResults : udfCallCode; } int32_t cleanUpUdfs() { - fnInfo("clean up udf unused function handles"); uv_mutex_lock(&gUdfdProxy.udfStubsMutex); int32_t i = 0; SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); if (stub->refCount == 0) { + fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); doTeardownUdf(stub->handle); } else { + fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle); taosArrayPush(udfStubs, stub); } ++i;