From c17d9eca5f339133801ccdd93736e8a9c2530a55 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 6 Apr 2023 11:18:30 +0800 Subject: [PATCH] fix: add fetchtime and version to udf init --- include/libs/function/taosudf.h | 1 + source/libs/function/src/udfd.c | 157 ++++++++++++++++++-------------- 2 files changed, 90 insertions(+), 68 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index b4daa895fd..2eccb6225c 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -276,6 +276,7 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU typedef struct SScriptUdfInfo { const char *name; + int32_t version; EUdfFuncType funcType; int8_t scriptType; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 095d9afd42..b030070d2d 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -100,7 +100,6 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { } int32_t code = 0; if (udfCtx->initFunc) { - // TODO: handle init call return error code = (udfCtx->initFunc)(); if (code != 0) { uv_dlclose(&udfCtx->lib); @@ -245,7 +244,7 @@ typedef struct SUvUdfWork { typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState; typedef struct SUdf { - char name[TSDB_FUNC_NAME_LEN + 1]; + char name[TSDB_FUNC_NAME_LEN + 1]; int32_t version; int8_t funcType; @@ -264,9 +263,11 @@ typedef struct SUdf { SUdfScriptPlugin *scriptPlugin; void *scriptUdfCtx; + + int64_t lastFetchTime; // last fetch time in milliseconds + bool expired; } SUdf; -// TODO: add private udf structure. typedef struct SUdfcFuncHandle { SUdf *udf; } SUdfcFuncHandle; @@ -319,7 +320,8 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static int32_t udfdRun(); static void udfdConnectMnodeThreadFunc(void *args); -void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { +SUdf *udfdNewUdf(const char *udfName); +void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; @@ -501,6 +503,7 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { udfInfo->funcType = UDF_FUNC_TYPE_SCALAR; } udfInfo->name = udf->name; + udfInfo->version = udf->version; udfInfo->outputLen = udf->outputLen; udfInfo->outputType = udf->outputType; udfInfo->path = udf->path; @@ -510,9 +513,9 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { int32_t udfdRenameUdfFile(SUdf *udf) { char newPath[PATH_MAX]; if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { - snprintf(newPath, PATH_MAX, "%s/lib%s.so", tsTempDir, udf->name); + snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%"PRId64".so", tsTempDir, udf->name, udf->version, udf->lastFetchTime); } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { - snprintf(newPath, PATH_MAX, "%s/%s.py", tsTempDir, udf->name); + snprintf(newPath, PATH_MAX, "%s/%s_%d_%"PRId64".py", tsTempDir, udf->name, udf->version, udf->lastFetchTime); } else { return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; } @@ -557,40 +560,53 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { return err; } - fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void*)udf->scriptUdfCtx); + fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx); return 0; } -SUdf *udfdGetOrCreateUdf(const char *udfName) { - SUdf *udf = NULL; - uv_mutex_lock(&global.udfsMutex); - SUdf **udfInHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); - if (udfInHash) { - ++(*udfInHash)->refCount; - udf = *udfInHash; - uv_mutex_unlock(&global.udfsMutex); - } else { - SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); - udfNew->refCount = 1; - strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); +SUdf *udfdNewUdf(const char *udfName) { + SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); + udfNew->refCount = 1; + strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); - udfNew->state = UDF_STATE_INIT; - uv_mutex_init(&udfNew->lock); - uv_cond_init(&udfNew->condReady); + udfNew->state = UDF_STATE_INIT; + uv_mutex_init(&udfNew->lock); + uv_cond_init(&udfNew->condReady); - udf = udfNew; - udf->resident = false; - for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { - char *funcName = taosArrayGet(global.residentFuncs, i); - if (strcmp(udfName, funcName) == 0) { - udf->resident = true; - break; - } + udfNew->resident = false; + udfNew->expired = false; + for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { + char *funcName = taosArrayGet(global.residentFuncs, i); + if (strcmp(udfName, funcName) == 0) { + udfNew->resident = true; + break; } - SUdf **pUdf = &udf; - taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); - uv_mutex_unlock(&global.udfsMutex); } + return udfNew; +} + +SUdf *udfdGetOrCreateUdf(const char *udfName) { + uv_mutex_lock(&global.udfsMutex); + SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); + int64_t currTime = taosGetTimestampSec(); + bool expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; + if (pUdfHash && !expired) { + ++(*pUdfHash)->refCount; + SUdf *udf = *pUdfHash; + uv_mutex_unlock(&global.udfsMutex); + return udf; + } + + if (pUdfHash && expired) { + (*pUdfHash)->expired = true; + taosHashRemove(global.udfsHash, udfName, strlen(udfName)); + } + + SUdf *udf = udfdNewUdf(udfName); + SUdf **pUdf = &udf; + taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); + uv_mutex_unlock(&global.udfsMutex); + return udf; } @@ -761,17 +777,19 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_mutex_lock(&global.udfsMutex); udf->refCount--; - if (udf->refCount == 0 && !udf->resident) { + if ((udf->refCount == 0 && !udf->resident) || + (udf->resident && udf->expired)) { unloadUdf = true; taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); } uv_mutex_unlock(&global.udfsMutex); if (unloadUdf) { - fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void*)(udf->scriptUdfCtx)); + fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx)); uv_cond_destroy(&udf->condReady); uv_mutex_destroy(&udf->lock); code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); fnDebug("udfd destroy function returns %d", code); + taosRemoveFile(udf->path); taosMemoryFree(udf); } taosMemoryFree(handle); @@ -792,6 +810,35 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { return; } +int32_t udfdSaveFuncBodyToFile(SFuncInfo* pFuncInfo, SUdf* udf) { + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + fnError("udfd create shared library failed since %s", terrstr(terrno)); + return terrno; + } + + char path[PATH_MAX] = {0}; +#ifdef WINDOWS + snprintf(path, sizeof(path), "%s%s_%d_%" PRId64, tsTempDir, pFuncInfo->name, udf->version, udf->lastFetchTime); +#else + snprintf(path, sizeof(path), "%s/%s_%d_%" PRId64, tsTempDir, pFuncInfo->name, udf->version, udf->lastFetchTime); +#endif + TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + if (file == NULL) { + fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); + return TSDB_CODE_FILE_CORRUPTED; + } + + int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); + if (count != pFuncInfo->codeSize) { + fnError("udfd write udf shared library failed"); + return TSDB_CODE_FILE_CORRUPTED; + } + taosCloseFile(&file); + strncpy(udf->path, path, PATH_MAX); + return TSDB_CODE_SUCCESS; +} + void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; @@ -830,49 +877,23 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { SRetrieveFuncRsp retrieveRsp = {0}; tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); - if (retrieveRsp.pFuncInfos == NULL) { - goto _return; - } + SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - SUdf *udf = msgInfo->param; + SUdf *udf = msgInfo->param; udf->funcType = pFuncInfo->funcType; udf->scriptType = pFuncInfo->scriptType; udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - udf->version = *(int32_t*)taosArrayGet(retrieveRsp.pFuncVersions,0); + udf->version = *(int32_t *)taosArrayGet(retrieveRsp.pFuncVersions, 0); - if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_AVAIL_DISK; - msgInfo->code = terrno; - fnError("udfd create shared library failed since %s", terrstr(terrno)); - goto _return; + msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf); + if (msgInfo->code == 0) { + udf->lastFetchTime = taosGetTimestampMs(); } - - char path[PATH_MAX] = {0}; -#ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s%d", tsTempDir, pFuncInfo->name, udf->version); -#else - snprintf(path, sizeof(path), "%s/%s%d", tsTempDir, pFuncInfo->name, udf->version); -#endif - TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); - if (file == NULL) { - fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); - msgInfo->code = TSDB_CODE_FILE_CORRUPTED; - goto _return; - } - - int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); - if (count != pFuncInfo->codeSize) { - fnError("udfd write udf shared library failed"); - msgInfo->code = TSDB_CODE_FILE_CORRUPTED; - goto _return; - } - taosCloseFile(&file); - strncpy(udf->path, path, PATH_MAX); tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); - msgInfo->code = 0; + taosArrayDestroy(retrieveRsp.pFuncVersions); } _return: