diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index ca5079a0a8..6a98138c6c 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -122,7 +122,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); -int32_t teardownUdfs(); +int32_t cleanUpUdfs(); // end API to taosd and qworker //============================================================================================================================= // begin API to UDF writer. diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 0c464d9b43..354f4d8752 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -157,7 +157,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; pTaskInfo->totalRows += current; - teardownUdfs(); + cleanUpUdfs(); qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index b5058a2d60..4841e05267 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -25,8 +25,6 @@ #include "functionMgt.h" //TODO: add unit test -//TODO: include all global variable under context struct - typedef struct SUdfdData { bool startCalled; bool needCleanUp; @@ -1275,7 +1273,6 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { } int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { - fnInfo("udfc setup udf. udfName: %s", udfName); if (gUdfdProxy.udfcState != UDFC_STATE_READY) { return TSDB_CODE_UDF_INVALID_STATE; } @@ -1305,7 +1302,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { if (task->errCode != 0) { fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) } else { - fnInfo("sucessfully setup udf func handle. handle: %p", task->session); + fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session); *funcHandle = task->session; } int32_t err = task->errCode; @@ -1490,13 +1487,11 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, return code; } -//TODO: when to teardown udf. teardown udf is not called int32_t doTeardownUdf(UdfcFuncHandle handle) { - fnInfo("tear down udf. udf func handle: %p", handle); - SUdfcUvSession *session = (SUdfcUvSession *) handle; + if (session->udfUvPipe == NULL) { - fnError("pipe to udfd does not exist"); + fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName); return TSDB_CODE_UDF_PIPE_NO_PIPE; } @@ -1511,7 +1506,6 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); SUdfTeardownResponse *rsp = &task->_teardown.rsp; - int32_t err = task->errCode; udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); @@ -1519,6 +1513,8 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { taosMemoryFree(task->session); taosMemoryFree(task); + fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); + return err; } @@ -1651,25 +1647,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } -// int32_t code = doTeardownUdf(session); -// if (code != 0) { -// fnError("udfAggFinalize error. doTeardownUdf step. udf code: %d", code); -// } - int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); releaseUdfFuncHandle(pCtx->udfName); return udfCallCode == 0 ? numOfResults : udfCallCode; } -int32_t teardownUdfs() { +int32_t cleanUpUdfs() { uv_mutex_lock(&gUdfdProxy.udfStubsMutex); int32_t i = 0; SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); if (stub->refCount == 0) { + fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); doTeardownUdf(stub->handle); } else { + fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle); taosArrayPush(udfStubs, stub); } ++i;