diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 2269ad7f6a..8c8b99a6f8 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -343,7 +343,7 @@ typedef struct SUdfcFuncStub { char udfName[TSDB_FUNC_NAME_LEN + 1]; UdfcFuncHandle handle; int32_t refCount; - int64_t lastRefTime; + int64_t createTime; } SUdfcFuncStub; typedef struct SUdfcProxy { @@ -363,6 +363,7 @@ typedef struct SUdfcProxy { uv_mutex_t udfStubsMutex; SArray *udfStubs; // SUdfcFuncStub + SArray *expiredUdfStubs; //SUdfcFuncStub uv_mutex_t udfcUvMutex; int8_t initialized; @@ -959,7 +960,7 @@ int32_t udfcOpen(); int32_t udfcClose(); int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle); -void releaseUdfFuncHandle(char *udfName); +void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle); int32_t cleanUpUdfs(); bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); @@ -967,6 +968,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); +void cleanupNotExpiredUdfs(); +void cleanupExpiredUdfs(); int compareUdfcFuncSub(const void *elem1, const void *elem2) { SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; @@ -982,16 +985,24 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { if (stubIndex != -1) { SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex); UdfcFuncHandle handle = foundStub->handle; - if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - *pHandle = foundStub->handle; - ++foundStub->refCount; - foundStub->lastRefTime = taosGetTimestampUs(); - uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); - return 0; + int64_t currUs = taosGetTimestampUs(); + bool expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000; + if (!expired) { + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + *pHandle = foundStub->handle; + ++foundStub->refCount; + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); + return 0; + } else { + fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName, + foundStub->refCount, foundStub->createTime); + taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); + } } else { - fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName, - foundStub->refCount, foundStub->lastRefTime); + fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); + taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub); + taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub); } } *pHandle = NULL; @@ -1001,7 +1012,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { strncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN); stub.handle = *pHandle; ++stub.refCount; - stub.lastRefTime = taosGetTimestampUs(); + stub.createTime = taosGetTimestampUs(); taosArrayPush(gUdfcProxy.udfStubs, &stub); taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub); } else { @@ -1012,21 +1023,75 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { return code; } -void releaseUdfFuncHandle(char *udfName) { +void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { uv_mutex_lock(&gUdfcProxy.udfStubsMutex); SUdfcFuncStub key = {0}; strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); - if (!foundStub) { + SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ); + if (!foundStub && !expiredStub) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return; } - if (foundStub->refCount > 0) { + if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) { --foundStub->refCount; } + if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) { + --expiredStub->refCount; + } uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); } +void cleanupExpiredUdfs() { + int32_t i = 0; + SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); + if (stub->refCount == 0) { + fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + doTeardownUdf(stub->handle); + } else { + fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, + stub->refCount, stub->createTime, stub->handle); + UdfcFuncHandle handle = stub->handle; + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + taosArrayPush(expiredUdfStubs, stub); + } else { + fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", + stub->udfName, stub->refCount, stub->createTime); + } + } + ++i; + } + taosArrayDestroy(gUdfcProxy.expiredUdfStubs); + gUdfcProxy.expiredUdfStubs = expiredUdfStubs; +} + +void cleanupNotExpiredUdfs() { + SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + int32_t i = 0; + while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) { + SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i); + if (stub->refCount == 0) { + fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + doTeardownUdf(stub->handle); + } else { + fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, + stub->refCount, stub->createTime, stub->handle); + UdfcFuncHandle handle = stub->handle; + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { + taosArrayPush(udfStubs, stub); + } else { + fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", + stub->udfName, stub->refCount, stub->createTime); + } + } + ++i; + } + taosArrayDestroy(gUdfcProxy.udfStubs); + gUdfcProxy.udfStubs = udfStubs; +} + int32_t cleanUpUdfs() { int8_t initialized = atomic_load_8(&gUdfcProxy.initialized); if (!initialized) { @@ -1034,32 +1099,15 @@ int32_t cleanUpUdfs() { } uv_mutex_lock(&gUdfcProxy.udfStubsMutex); - if (gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) { + if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) && + (gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) { uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return TSDB_CODE_SUCCESS; } - SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); - int32_t i = 0; - while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) { - SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i); - if (stub->refCount == 0) { - fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); - doTeardownUdf(stub->handle); - } else { - fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %" PRId64 ", handle: %p", stub->udfName, - stub->refCount, stub->lastRefTime, stub->handle); - UdfcFuncHandle handle = stub->handle; - if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - taosArrayPush(udfStubs, stub); - } else { - fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", - stub->udfName, stub->refCount, stub->lastRefTime); - } - } - ++i; - } - taosArrayDestroy(gUdfcProxy.udfStubs); - gUdfcProxy.udfStubs = udfStubs; + + cleanupNotExpiredUdfs(); + cleanupExpiredUdfs(); + uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return 0; } @@ -1075,7 +1123,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, code = doCallUdfScalarFunc(handle, input, numOfCols, output); if (code != TSDB_CODE_SUCCESS) { fnError("udfc scalar function execution failure"); - releaseUdfFuncHandle(udfName); + releaseUdfFuncHandle(udfName, handle); return code; } @@ -1089,7 +1137,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } } - releaseUdfFuncHandle(udfName); + releaseUdfFuncHandle(udfName, handle); return code; } @@ -1122,7 +1170,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); return false; } if (buf.bufLen <= session->bufSize) { @@ -1131,10 +1179,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult udfRes->interResNum = buf.numOfResult; } else { fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); return false; } - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); freeUdfInterBuf(&buf); return true; } @@ -1191,7 +1239,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { taosArrayDestroy(pTempBlock->pDataBlock); taosMemoryFree(pTempBlock); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); freeUdfInterBuf(&newState); return udfCode; } @@ -1236,7 +1284,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { freeUdfInterBuf(&resultBuf); int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); - releaseUdfFuncHandle(pCtx->udfName); + releaseUdfFuncHandle(pCtx->udfName, handle); return udfCallCode == 0 ? numOfResults : udfCallCode; } @@ -1663,6 +1711,7 @@ int32_t udfcOpen() { uv_barrier_wait(&proxy->initBarrier); uv_mutex_init(&proxy->udfStubsMutex); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); + proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); uv_mutex_init(&proxy->udfcUvMutex); fnInfo("udfc initialized") return 0; } @@ -1679,6 +1728,7 @@ int32_t udfcClose() { uv_thread_join(&udfc->loopThread); uv_mutex_destroy(&udfc->taskQueueMutex); uv_barrier_destroy(&udfc->initBarrier); + taosArrayDestroy(udfc->expiredUdfStubs); taosArrayDestroy(udfc->udfStubs); uv_mutex_destroy(&udfc->udfStubsMutex); uv_mutex_destroy(&udfc->udfcUvMutex); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index aa72309c62..5034af2f82 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -591,7 +591,7 @@ SUdf *udfdNewUdf(const char *udfName) { SUdf *udfdGetOrCreateUdf(const char *udfName) { uv_mutex_lock(&global.udfsMutex); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); - int64_t currTime = taosGetTimestampSec(); + int64_t currTime = taosGetTimestampMs(); bool expired = false; if (pUdfHash) { expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s @@ -688,6 +688,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { output.colMeta.type = udf->outputType; output.colMeta.precision = 0; output.colMeta.scale = 0; + udfColEnsureCapacity(&output, call->block.info.rows); + SUdfDataBlock input = {0}; convertDataBlockToUdfDataBlock(&call->block, &input); code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);