diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 92dbdda496..53b2cf4aca 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -363,6 +363,7 @@ typedef struct SUdfcProxy { uv_mutex_t udfStubsMutex; SArray *udfStubs; // SUdfcFuncStub + SArray *expiredUdfStubs; //SUdfcFuncStub uv_mutex_t udfcUvMutex; int8_t initialized; @@ -983,15 +984,22 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex); UdfcFuncHandle handle = foundStub->handle; int64_t currUs = taosGetTimestampUs(); - if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL && (currUs - foundStub->createTime) < 10 * 1000 * 1000) { - *pHandle = foundStub->handle; - ++foundStub->refCount; - uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); - return 0; + bool expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000; + if (!expired) { + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + *pHandle = foundStub->handle; + ++foundStub->refCount; + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); + return 0; + } else { + fnInfo("invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, + foundStub->refCount, foundStub->createTime); + taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); + } } else { - fnInfo("invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, - foundStub->refCount, foundStub->createTime); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); + taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub); + taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub); } } *pHandle = NULL; @@ -1017,13 +1025,17 @@ void releaseUdfFuncHandle(char *udfName) { SUdfcFuncStub key = {0}; strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - if (!foundStub) { + SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ); + if (!foundStub && !expiredStub) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return; } - if (foundStub->refCount > 0) { + if (foundStub != NULL && foundStub->refCount > 0) { --foundStub->refCount; } + if (expiredStub != NULL && expiredStub->refCount > 0) { + --expiredStub->refCount; + } uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); } @@ -1060,6 +1072,28 @@ int32_t cleanUpUdfs() { } taosArrayDestroy(gUdfcProxy.udfStubs); gUdfcProxy.udfStubs = udfStubs; + + SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); + if (stub->refCount == 0) { + fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + doTeardownUdf(stub->handle); + } else { + fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, + stub->refCount, stub->createTime, stub->handle); + UdfcFuncHandle handle = stub->handle; + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + taosArrayPush(expiredUdfStubs, stub); + } else { + fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", + stub->udfName, stub->refCount, stub->createTime); + } + } + ++i; + } + taosArrayDestroy(gUdfcProxy.udfStubs); + gUdfcProxy.expiredUdfStubs = expiredUdfStubs; uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; } @@ -1663,6 +1697,7 @@ int32_t udfcOpen() { uv_barrier_wait(&proxy->initBarrier); uv_mutex_init(&proxy->udfStubsMutex); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); + proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); uv_mutex_init(&proxy->udfcUvMutex); fnInfo("udfc initialized") return 0; } @@ -1679,6 +1714,7 @@ int32_t udfcClose() { uv_thread_join(&udfc->loopThread); uv_mutex_destroy(&udfc->taskQueueMutex); uv_barrier_destroy(&udfc->initBarrier); + taosArrayDestroy(udfc->expiredUdfStubs); taosArrayDestroy(udfc->udfStubs); uv_mutex_destroy(&udfc->udfStubsMutex); uv_mutex_destroy(&udfc->udfcUvMutex);