From 1a0c9f31bc105b7e8e1f8ffeb10b539be7b7a1e2 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 17 Apr 2023 10:34:34 +0800 Subject: [PATCH] enhance: refactor cleanup udf function --- source/libs/function/src/tudf.c | 72 +++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index f13235f24b..8c8b99a6f8 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -968,6 +968,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); +void cleanupNotExpiredUdfs(); +void cleanupExpiredUdfs(); int compareUdfcFuncSub(const void *elem1, const void *elem2) { SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; @@ -1040,18 +1042,32 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); } -int32_t cleanUpUdfs() { - int8_t initialized = atomic_load_8(&gUdfcProxy.initialized); - if (!initialized) { - return TSDB_CODE_SUCCESS; +void cleanupExpiredUdfs() { + int32_t i = 0; + SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); + if (stub->refCount == 0) { + fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + doTeardownUdf(stub->handle); + } else { + fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, + stub->refCount, stub->createTime, stub->handle); + UdfcFuncHandle handle = stub->handle; + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + taosArrayPush(expiredUdfStubs, stub); + } else { + fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", + stub->udfName, stub->refCount, stub->createTime); + } + } + ++i; } + taosArrayDestroy(gUdfcProxy.expiredUdfStubs); + gUdfcProxy.expiredUdfStubs = expiredUdfStubs; +} - uv_mutex_lock(&gUdfcProxy.udfStubsMutex); - if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) && - (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) { - uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); - return TSDB_CODE_SUCCESS; - } +void cleanupNotExpiredUdfs() { SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); int32_t i = 0; while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) { @@ -1074,28 +1090,24 @@ int32_t cleanUpUdfs() { } taosArrayDestroy(gUdfcProxy.udfStubs); gUdfcProxy.udfStubs = udfStubs; +} - SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); - while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { - SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); - if (stub->refCount == 0) { - fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); - doTeardownUdf(stub->handle); - } else { - fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, - stub->refCount, stub->createTime, stub->handle); - UdfcFuncHandle handle = stub->handle; - if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - taosArrayPush(expiredUdfStubs, stub); - } else { - fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", - stub->udfName, stub->refCount, stub->createTime); - } - } - ++i; +int32_t cleanUpUdfs() { + int8_t initialized = atomic_load_8(&gUdfcProxy.initialized); + if (!initialized) { + return TSDB_CODE_SUCCESS; } - taosArrayDestroy(gUdfcProxy.expiredUdfStubs); - gUdfcProxy.expiredUdfStubs = expiredUdfStubs; + + uv_mutex_lock(&gUdfcProxy.udfStubsMutex); + if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) && + (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) { + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); + return TSDB_CODE_SUCCESS; + } + + cleanupNotExpiredUdfs(); + cleanupExpiredUdfs(); + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; }