From 33143e90c831de915b819f5721417f5dbf17a7df Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 12 Apr 2023 09:17:31 +0800 Subject: [PATCH 1/6] enhance: add udf stub expiration of 10s after creation --- source/libs/function/src/tudf.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 2269ad7f6a..92dbdda496 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -343,7 +343,7 @@ typedef struct SUdfcFuncStub { char udfName[TSDB_FUNC_NAME_LEN + 1]; UdfcFuncHandle handle; int32_t refCount; - int64_t lastRefTime; + int64_t createTime; } SUdfcFuncStub; typedef struct SUdfcProxy { @@ -982,15 +982,15 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { if (stubIndex != -1) { SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex); UdfcFuncHandle handle = foundStub->handle; - if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + int64_t currUs = taosGetTimestampUs(); + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL && (currUs - foundStub->createTime) < 10 * 1000 * 1000) { *pHandle = foundStub->handle; ++foundStub->refCount; - foundStub->lastRefTime = taosGetTimestampUs(); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; } else { - fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName, - foundStub->refCount, foundStub->lastRefTime); + fnInfo("invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, + foundStub->refCount, foundStub->createTime); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); } } @@ -1001,7 +1001,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { strncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN); stub.handle = *pHandle; ++stub.refCount; - stub.lastRefTime = taosGetTimestampUs(); + stub.createTime = taosGetTimestampUs(); taosArrayPush(gUdfcProxy.udfStubs, &stub); taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub); } else { @@ -1046,14 +1046,14 @@ int32_t cleanUpUdfs() { fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); doTeardownUdf(stub->handle); } else { - fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %" PRId64 ", handle: %p", stub->udfName, - stub->refCount, stub->lastRefTime, stub->handle); + fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, + stub->refCount, stub->createTime, stub->handle); UdfcFuncHandle handle = stub->handle; if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { taosArrayPush(udfStubs, stub); } else { - fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", - stub->udfName, stub->refCount, stub->lastRefTime); + fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", + stub->udfName, stub->refCount, stub->createTime); } } ++i; From fe718f60ee1120d0d7bd7618d76dd99655974d4d Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 12 Apr 2023 12:15:09 +0800 Subject: [PATCH 2/6] fix: add expired udfc func stub to track the expired --- source/libs/function/src/tudf.c | 54 +++++++++++++++++++++++++++------ 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 92dbdda496..53b2cf4aca 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -363,6 +363,7 @@ typedef struct SUdfcProxy { uv_mutex_t udfStubsMutex; SArray *udfStubs; // SUdfcFuncStub + SArray *expiredUdfStubs; //SUdfcFuncStub uv_mutex_t udfcUvMutex; int8_t initialized; @@ -983,15 +984,22 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex); UdfcFuncHandle handle = foundStub->handle; int64_t currUs = taosGetTimestampUs(); - if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL && (currUs - foundStub->createTime) < 10 * 1000 * 1000) { - *pHandle = foundStub->handle; - ++foundStub->refCount; - uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); - return 0; + bool expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000; + if (!expired) { + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + *pHandle = foundStub->handle; + ++foundStub->refCount; + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); + return 0; + } else { + fnInfo("invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, + foundStub->refCount, foundStub->createTime); + taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); + } } else { - fnInfo("invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, - foundStub->refCount, foundStub->createTime); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); + taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub); + taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub); } } *pHandle = NULL; @@ -1017,13 +1025,17 @@ void releaseUdfFuncHandle(char *udfName) { SUdfcFuncStub key = {0}; strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - if (!foundStub) { + SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ); + if (!foundStub && !expiredStub) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return; } - if (foundStub->refCount > 0) { + if (foundStub != NULL && foundStub->refCount > 0) { --foundStub->refCount; } + if (expiredStub != NULL && expiredStub->refCount > 0) { + --expiredStub->refCount; + } uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); } @@ -1060,6 +1072,28 @@ int32_t cleanUpUdfs() { } taosArrayDestroy(gUdfcProxy.udfStubs); gUdfcProxy.udfStubs = udfStubs; + + SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); + if (stub->refCount == 0) { + fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + doTeardownUdf(stub->handle); + } else { + fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, + stub->refCount, stub->createTime, stub->handle); + UdfcFuncHandle handle = stub->handle; + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + taosArrayPush(expiredUdfStubs, stub); + } else { + fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", + stub->udfName, stub->refCount, stub->createTime); + } + } + ++i; + } + taosArrayDestroy(gUdfcProxy.udfStubs); + gUdfcProxy.expiredUdfStubs = expiredUdfStubs; uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; } @@ -1663,6 +1697,7 @@ int32_t udfcOpen() { uv_barrier_wait(&proxy->initBarrier); uv_mutex_init(&proxy->udfStubsMutex); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); + proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); uv_mutex_init(&proxy->udfcUvMutex); fnInfo("udfc initialized") return 0; } @@ -1679,6 +1714,7 @@ int32_t udfcClose() { uv_thread_join(&udfc->loopThread); uv_mutex_destroy(&udfc->taskQueueMutex); uv_barrier_destroy(&udfc->initBarrier); + taosArrayDestroy(udfc->expiredUdfStubs); taosArrayDestroy(udfc->udfStubs); uv_mutex_destroy(&udfc->udfStubsMutex); uv_mutex_destroy(&udfc->udfcUvMutex); From 99587db7a10b7a6d5aeb28c615c4becaf6e90584 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 14 Apr 2023 07:58:25 +0800 Subject: [PATCH 3/6] fix: udf handle expired after 10s --- source/libs/function/src/tudf.c | 30 ++++++++++++++++-------------- source/libs/function/src/udfd.c | 2 +- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 53b2cf4aca..f13235f24b 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -960,7 +960,7 @@ int32_t udfcOpen(); int32_t udfcClose(); int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle); -void releaseUdfFuncHandle(char *udfName); +void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle); int32_t cleanUpUdfs(); bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); @@ -992,11 +992,12 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; } else { - fnInfo("invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, + fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, foundStub->refCount, foundStub->createTime); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); } } else { + fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub); taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub); @@ -1020,7 +1021,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { return code; } -void releaseUdfFuncHandle(char *udfName) { +void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { uv_mutex_lock(&gUdfcProxy.udfStubsMutex); SUdfcFuncStub key = {0}; strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); @@ -1030,10 +1031,10 @@ void releaseUdfFuncHandle(char *udfName) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return; } - if (foundStub != NULL && foundStub->refCount > 0) { + if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) { --foundStub->refCount; } - if (expiredStub != NULL && expiredStub->refCount > 0) { + if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) { --expiredStub->refCount; } uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); @@ -1046,7 +1047,8 @@ int32_t cleanUpUdfs() { } uv_mutex_lock(&gUdfcProxy.udfStubsMutex); - if (gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) { + if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) && + (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return TSDB_CODE_SUCCESS; } @@ -1092,7 +1094,7 @@ int32_t cleanUpUdfs() { } ++i; } - taosArrayDestroy(gUdfcProxy.udfStubs); + taosArrayDestroy(gUdfcProxy.expiredUdfStubs); gUdfcProxy.expiredUdfStubs = expiredUdfStubs; uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; @@ -1109,7 +1111,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, code = doCallUdfScalarFunc(handle, input, numOfCols, output); if (code != TSDB_CODE_SUCCESS) { fnError("udfc scalar function execution failure"); - releaseUdfFuncHandle(udfName); + releaseUdfFuncHandle(udfName, handle); return code; } @@ -1123,7 +1125,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } } - releaseUdfFuncHandle(udfName); + releaseUdfFuncHandle(udfName, handle); return code; } @@ -1156,7 +1158,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); return false; } if (buf.bufLen <= session->bufSize) { @@ -1165,10 +1167,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult udfRes->interResNum = buf.numOfResult; } else { fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); return false; } - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); freeUdfInterBuf(&buf); return true; } @@ -1225,7 +1227,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { taosArrayDestroy(pTempBlock->pDataBlock); taosMemoryFree(pTempBlock); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); freeUdfInterBuf(&newState); return udfCode; } @@ -1270,7 +1272,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { freeUdfInterBuf(&resultBuf); int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); return udfCallCode == 0 ? numOfResults : udfCallCode; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index aa72309c62..61c6f92e2f 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -591,7 +591,7 @@ SUdf *udfdNewUdf(const char *udfName) { SUdf *udfdGetOrCreateUdf(const char *udfName) { uv_mutex_lock(&global.udfsMutex); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); - int64_t currTime = taosGetTimestampSec(); + int64_t currTime = taosGetTimestampMs(); bool expired = false; if (pUdfHash) { expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s From 430457e5f834f77af1422b651fbd7e9c3b52416d Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 14 Apr 2023 20:02:46 +0800 Subject: [PATCH 4/6] fix: copy version from main branch of tmqDelete-1ctb.py --- tests/system-test/7-tmq/tmqDelete-1ctb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/7-tmq/tmqDelete-1ctb.py b/tests/system-test/7-tmq/tmqDelete-1ctb.py index 4b8c8c8629..aa9c8d25d0 100644 --- a/tests/system-test/7-tmq/tmqDelete-1ctb.py +++ b/tests/system-test/7-tmq/tmqDelete-1ctb.py @@ -238,7 +238,7 @@ class TDTestCase: if self.snapshot == 0: consumerId = 2 - expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) * 2 elif self.snapshot == 1: consumerId = 3 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4)) @@ -324,7 +324,7 @@ class TDTestCase: if self.snapshot == 0: consumerId = 4 - expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) * 2 elif self.snapshot == 1: consumerId = 5 expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4)) From 1a0c9f31bc105b7e8e1f8ffeb10b539be7b7a1e2 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 17 Apr 2023 10:34:34 +0800 Subject: [PATCH 5/6] enhance: refactor cleanup udf function --- source/libs/function/src/tudf.c | 72 +++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index f13235f24b..8c8b99a6f8 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -968,6 +968,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); +void cleanupNotExpiredUdfs(); +void cleanupExpiredUdfs(); int compareUdfcFuncSub(const void *elem1, const void *elem2) { SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; @@ -1040,18 +1042,32 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); } -int32_t cleanUpUdfs() { - int8_t initialized = atomic_load_8(&gUdfcProxy.initialized); - if (!initialized) { - return TSDB_CODE_SUCCESS; +void cleanupExpiredUdfs() { + int32_t i = 0; + SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); + if (stub->refCount == 0) { + fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + doTeardownUdf(stub->handle); + } else { + fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, + stub->refCount, stub->createTime, stub->handle); + UdfcFuncHandle handle = stub->handle; + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + taosArrayPush(expiredUdfStubs, stub); + } else { + fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", + stub->udfName, stub->refCount, stub->createTime); + } + } + ++i; } + taosArrayDestroy(gUdfcProxy.expiredUdfStubs); + gUdfcProxy.expiredUdfStubs = expiredUdfStubs; +} - uv_mutex_lock(&gUdfcProxy.udfStubsMutex); - if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) && - (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) { - uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); - return TSDB_CODE_SUCCESS; - } +void cleanupNotExpiredUdfs() { SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); int32_t i = 0; while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) { @@ -1074,28 +1090,24 @@ int32_t cleanUpUdfs() { } taosArrayDestroy(gUdfcProxy.udfStubs); gUdfcProxy.udfStubs = udfStubs; +} - SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); - while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { - SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); - if (stub->refCount == 0) { - fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); - doTeardownUdf(stub->handle); - } else { - fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, - stub->refCount, stub->createTime, stub->handle); - UdfcFuncHandle handle = stub->handle; - if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - taosArrayPush(expiredUdfStubs, stub); - } else { - fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", - stub->udfName, stub->refCount, stub->createTime); - } - } - ++i; +int32_t cleanUpUdfs() { + int8_t initialized = atomic_load_8(&gUdfcProxy.initialized); + if (!initialized) { + return TSDB_CODE_SUCCESS; } - taosArrayDestroy(gUdfcProxy.expiredUdfStubs); - gUdfcProxy.expiredUdfStubs = expiredUdfStubs; + + uv_mutex_lock(&gUdfcProxy.udfStubsMutex); + if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) && + (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) { + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); + return TSDB_CODE_SUCCESS; + } + + cleanupNotExpiredUdfs(); + cleanupExpiredUdfs(); + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; } From a2d75a327cb68a4f59c76c6d2b9a2150880b6eee Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 18 Apr 2023 08:28:12 +0800 Subject: [PATCH 6/6] enhance: udf output column reserve capacity --- source/libs/function/src/udfd.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 61c6f92e2f..5034af2f82 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -688,6 +688,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { output.colMeta.type = udf->outputType; output.colMeta.precision = 0; output.colMeta.scale = 0; + udfColEnsureCapacity(&output, call->block.info.rows); + SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);