From 23d4eef1144f727497dfe975cdd289d44e354295 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 30 Mar 2023 11:46:40 +0800 Subject: [PATCH 01/29] feat: startup of replace function when no active query --- include/common/tmsg.h | 2 ++ source/common/src/tmsg.c | 19 +++++++++++++++++++ source/libs/function/src/udfd.c | 7 ++++--- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d7f9e16d87..5d732c7c6f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1054,6 +1054,7 @@ typedef struct { int64_t signature; char* pComment; char* pCode; + int8_t orReplace; } SCreateFuncReq; int32_t tSerializeSCreateFuncReq(void* buf, int32_t bufLen, SCreateFuncReq* pReq); @@ -1095,6 +1096,7 @@ typedef struct { typedef struct { int32_t numOfFuncs; SArray* pFuncInfos; + SArray* pFuncVersions; } SRetrieveFuncRsp; int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ccc3ceae7b..b4945da699 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1702,6 +1702,8 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1; } + if (tEncodeI8(&encoder, pReq->orReplace) <0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1744,6 +1746,8 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1; } + if (tDecoodeI8(&decoder, &pReq->orReplace) < 0) return -1; + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -1855,6 +1859,12 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp * } } + if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncVersions)) return -1; + for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { + int32_t version = *(int32_t*)taosArrayGet(pRsp->pFuncVersions, i); + if (tEncodeI32(&encoder, version) < 0) return -1; + } + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1902,6 +1912,15 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp taosArrayPush(pRsp->pFuncInfos, &fInfo); } + + pRsp->pFuncVersions = taosArrayInit(pRsp->numOfFuncs, sizeof(int32_t)); + if (pRsp->pFuncVersions == NULL) return -1; + + for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { + int32_t version = 0; + if (tDecodeI32(&decoder, &version) < 0) return -1; + taosArrayPush(pRsp->pFuncVersions, &version); + } tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index a8b993290e..095d9afd42 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -246,6 +246,7 @@ typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfStat typedef struct SUdf { char name[TSDB_FUNC_NAME_LEN + 1]; + int32_t version; int8_t funcType; int8_t scriptType; @@ -833,13 +834,13 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { goto _return; } SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - // SUdf *udf = msgInfo->param; SUdf *udf = msgInfo->param; udf->funcType = pFuncInfo->funcType; udf->scriptType = pFuncInfo->scriptType; udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; + udf->version = *(int32_t*)taosArrayGet(retrieveRsp.pFuncVersions,0); if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; @@ -850,9 +851,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { char path[PATH_MAX] = {0}; #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name); + snprintf(path, sizeof(path), "%s%s%d", tsTempDir, pFuncInfo->name, udf->version); #else - snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name); + snprintf(path, sizeof(path), "%s/%s%d", tsTempDir, pFuncInfo->name, udf->version); #endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { From 55ba457fef7d04138cc9f956d10bd7167ea38ca4 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 30 Mar 2023 12:05:56 +0800 Subject: [PATCH 02/29] fix: fix bugs --- source/common/src/tmsg.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b4945da699..fdfa2b2bce 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1746,7 +1746,7 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1; } - if (tDecoodeI8(&decoder, &pReq->orReplace) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1; tEndDecode(&decoder); From a71caffed6569d6884372d862d3266328754567e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Tue, 4 Apr 2023 19:53:08 +0800 Subject: [PATCH 03/29] add version in mnode --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndFunc.c | 28 +++++++++++++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 801afd562b..c22e5422b6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -447,6 +447,7 @@ typedef struct { int32_t codeSize; char* pComment; char* pCode; + int32_t funcVersions; } SFuncObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index 8d006f1029..a0ae622985 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -21,7 +21,7 @@ #include "mndTrans.h" #include "mndUser.h" -#define SDB_FUNC_VER 1 +#define SDB_FUNC_VER 2 #define SDB_FUNC_RESERVE_SIZE 64 static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc); @@ -83,6 +83,7 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) { SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER) } SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER) + SDB_SET_INT32(pRaw, dataPos, pFunc->funcVersions, _OVER) SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER); @@ -107,7 +108,7 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != SDB_FUNC_VER) { + if (sver != 1 && sver != 2) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto _OVER; } @@ -144,6 +145,11 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { goto _OVER; } SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER) + + if(sver >= 2){ + SDB_GET_INT32(pRaw, dataPos, &pFunc->funcVersions, _OVER) + } + SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) terrno = 0; @@ -222,6 +228,15 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre memcpy(func.pComment, pCreate->pComment, func.commentSize); } memcpy(func.pCode, pCreate->pCode, func.codeSize); + + if(pCreate->orReplace == 1){ + SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name); + if(oldFunc == NULL){ + goto _OVER; + } + func.funcVersions = oldFunc->funcVersions + 1; + mndReleaseFunc(pMnode, oldFunc); + } pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func"); if (pTrans == NULL) goto _OVER; @@ -413,6 +428,12 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { goto RETRIEVE_FUNC_OVER; } + retrieveRsp.pFuncVersions = taosArrayInit(retrieveReq.numOfFuncs, sizeof(int32_t)); + if (retrieveRsp.pFuncVersions == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto RETRIEVE_FUNC_OVER; + } + for (int32_t i = 0; i < retrieveReq.numOfFuncs; ++i) { char *funcName = taosArrayGet(retrieveReq.pFuncNames, i); @@ -451,6 +472,9 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { } } taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo); + + taosArrayPush(retrieveRsp.pFuncVersions, &pFunc->funcVersions); + mndReleaseFunc(pMnode, pFunc); } From c17d9eca5f339133801ccdd93736e8a9c2530a55 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 6 Apr 2023 11:18:30 +0800 Subject: [PATCH 04/29] fix: add fetchtime and version to udf init --- include/libs/function/taosudf.h | 1 + source/libs/function/src/udfd.c | 157 ++++++++++++++++++-------------- 2 files changed, 90 insertions(+), 68 deletions(-) diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index b4daa895fd..2eccb6225c 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -276,6 +276,7 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU typedef struct SScriptUdfInfo { const char *name; + int32_t version; EUdfFuncType funcType; int8_t scriptType; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 095d9afd42..b030070d2d 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -100,7 +100,6 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { } int32_t code = 0; if (udfCtx->initFunc) { - // TODO: handle init call return error code = (udfCtx->initFunc)(); if (code != 0) { uv_dlclose(&udfCtx->lib); @@ -245,7 +244,7 @@ typedef struct SUvUdfWork { typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfState; typedef struct SUdf { - char name[TSDB_FUNC_NAME_LEN + 1]; + char name[TSDB_FUNC_NAME_LEN + 1]; int32_t version; int8_t funcType; @@ -264,9 +263,11 @@ typedef struct SUdf { SUdfScriptPlugin *scriptPlugin; void *scriptUdfCtx; + + int64_t lastFetchTime; // last fetch time in milliseconds + bool expired; } SUdf; -// TODO: add private udf structure. typedef struct SUdfcFuncHandle { SUdf *udf; } SUdfcFuncHandle; @@ -319,7 +320,8 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static int32_t udfdRun(); static void udfdConnectMnodeThreadFunc(void *args); -void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { +SUdf *udfdNewUdf(const char *udfName); +void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; @@ -501,6 +503,7 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { udfInfo->funcType = UDF_FUNC_TYPE_SCALAR; } udfInfo->name = udf->name; + udfInfo->version = udf->version; udfInfo->outputLen = udf->outputLen; udfInfo->outputType = udf->outputType; udfInfo->path = udf->path; @@ -510,9 +513,9 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { int32_t udfdRenameUdfFile(SUdf *udf) { char newPath[PATH_MAX]; if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { - snprintf(newPath, PATH_MAX, "%s/lib%s.so", tsTempDir, udf->name); + snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%"PRId64".so", tsTempDir, udf->name, udf->version, udf->lastFetchTime); } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { - snprintf(newPath, PATH_MAX, "%s/%s.py", tsTempDir, udf->name); + snprintf(newPath, PATH_MAX, "%s/%s_%d_%"PRId64".py", tsTempDir, udf->name, udf->version, udf->lastFetchTime); } else { return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; } @@ -557,40 +560,53 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { return err; } - fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void*)udf->scriptUdfCtx); + fnInfo("udf init succeeded. name %s type %d context %p", udf->name, udf->scriptType, (void *)udf->scriptUdfCtx); return 0; } -SUdf *udfdGetOrCreateUdf(const char *udfName) { - SUdf *udf = NULL; - uv_mutex_lock(&global.udfsMutex); - SUdf **udfInHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); - if (udfInHash) { - ++(*udfInHash)->refCount; - udf = *udfInHash; - uv_mutex_unlock(&global.udfsMutex); - } else { - SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); - udfNew->refCount = 1; - strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); +SUdf *udfdNewUdf(const char *udfName) { + SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); + udfNew->refCount = 1; + strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); - udfNew->state = UDF_STATE_INIT; - uv_mutex_init(&udfNew->lock); - uv_cond_init(&udfNew->condReady); + udfNew->state = UDF_STATE_INIT; + uv_mutex_init(&udfNew->lock); + uv_cond_init(&udfNew->condReady); - udf = udfNew; - udf->resident = false; - for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { - char *funcName = taosArrayGet(global.residentFuncs, i); - if (strcmp(udfName, funcName) == 0) { - udf->resident = true; - break; - } + udfNew->resident = false; + udfNew->expired = false; + for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { + char *funcName = taosArrayGet(global.residentFuncs, i); + if (strcmp(udfName, funcName) == 0) { + udfNew->resident = true; + break; } - SUdf **pUdf = &udf; - taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); - uv_mutex_unlock(&global.udfsMutex); } + return udfNew; +} + +SUdf *udfdGetOrCreateUdf(const char *udfName) { + uv_mutex_lock(&global.udfsMutex); + SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); + int64_t currTime = taosGetTimestampSec(); + bool expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; + if (pUdfHash && !expired) { + ++(*pUdfHash)->refCount; + SUdf *udf = *pUdfHash; + uv_mutex_unlock(&global.udfsMutex); + return udf; + } + + if (pUdfHash && expired) { + (*pUdfHash)->expired = true; + taosHashRemove(global.udfsHash, udfName, strlen(udfName)); + } + + SUdf *udf = udfdNewUdf(udfName); + SUdf **pUdf = &udf; + taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); + uv_mutex_unlock(&global.udfsMutex); + return udf; } @@ -761,17 +777,19 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_mutex_lock(&global.udfsMutex); udf->refCount--; - if (udf->refCount == 0 && !udf->resident) { + if ((udf->refCount == 0 && !udf->resident) || + (udf->resident && udf->expired)) { unloadUdf = true; taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); } uv_mutex_unlock(&global.udfsMutex); if (unloadUdf) { - fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void*)(udf->scriptUdfCtx)); + fnInfo("udf teardown. udf name: %s type %d: context %p", udf->name, udf->scriptType, (void *)(udf->scriptUdfCtx)); uv_cond_destroy(&udf->condReady); uv_mutex_destroy(&udf->lock); code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); fnDebug("udfd destroy function returns %d", code); + taosRemoveFile(udf->path); taosMemoryFree(udf); } taosMemoryFree(handle); @@ -792,6 +810,35 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { return; } +int32_t udfdSaveFuncBodyToFile(SFuncInfo* pFuncInfo, SUdf* udf) { + if (!osTempSpaceAvailable()) { + terrno = TSDB_CODE_NO_AVAIL_DISK; + fnError("udfd create shared library failed since %s", terrstr(terrno)); + return terrno; + } + + char path[PATH_MAX] = {0}; +#ifdef WINDOWS + snprintf(path, sizeof(path), "%s%s_%d_%" PRId64, tsTempDir, pFuncInfo->name, udf->version, udf->lastFetchTime); +#else + snprintf(path, sizeof(path), "%s/%s_%d_%" PRId64, tsTempDir, pFuncInfo->name, udf->version, udf->lastFetchTime); +#endif + TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + if (file == NULL) { + fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); + return TSDB_CODE_FILE_CORRUPTED; + } + + int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); + if (count != pFuncInfo->codeSize) { + fnError("udfd write udf shared library failed"); + return TSDB_CODE_FILE_CORRUPTED; + } + taosCloseFile(&file); + strncpy(udf->path, path, PATH_MAX); + return TSDB_CODE_SUCCESS; +} + void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; @@ -830,49 +877,23 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { SRetrieveFuncRsp retrieveRsp = {0}; tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); - if (retrieveRsp.pFuncInfos == NULL) { - goto _return; - } + SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); - SUdf *udf = msgInfo->param; + SUdf *udf = msgInfo->param; udf->funcType = pFuncInfo->funcType; udf->scriptType = pFuncInfo->scriptType; udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - udf->version = *(int32_t*)taosArrayGet(retrieveRsp.pFuncVersions,0); + udf->version = *(int32_t *)taosArrayGet(retrieveRsp.pFuncVersions, 0); - if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_AVAIL_DISK; - msgInfo->code = terrno; - fnError("udfd create shared library failed since %s", terrstr(terrno)); - goto _return; + msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf); + if (msgInfo->code == 0) { + udf->lastFetchTime = taosGetTimestampMs(); } - - char path[PATH_MAX] = {0}; -#ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s%d", tsTempDir, pFuncInfo->name, udf->version); -#else - snprintf(path, sizeof(path), "%s/%s%d", tsTempDir, pFuncInfo->name, udf->version); -#endif - TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); - if (file == NULL) { - fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); - msgInfo->code = TSDB_CODE_FILE_CORRUPTED; - goto _return; - } - - int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); - if (count != pFuncInfo->codeSize) { - fnError("udfd write udf shared library failed"); - msgInfo->code = TSDB_CODE_FILE_CORRUPTED; - goto _return; - } - taosCloseFile(&file); - strncpy(udf->path, path, PATH_MAX); tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); - msgInfo->code = 0; + taosArrayDestroy(retrieveRsp.pFuncVersions); } _return: From adec92b0b296ad026007d2c1d11a69de72d30245 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 6 Apr 2023 11:37:48 +0800 Subject: [PATCH 05/29] fix: change a minor bug --- source/libs/function/src/udfd.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index b030070d2d..31b89bd882 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -777,8 +777,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_mutex_lock(&global.udfsMutex); udf->refCount--; - if ((udf->refCount == 0 && !udf->resident) || - (udf->resident && udf->expired)) { + if (udf->refCount == 0 && (!udf->resident || udf->expired)) { unloadUdf = true; taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); } From 8ab1aa4ac9c825f1ffe0961538de2e511011a7dc Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 6 Apr 2023 13:13:54 +0800 Subject: [PATCH 06/29] fix: change bugs related to expired udf --- source/libs/function/src/udfd.c | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 31b89bd882..0ab76b8e64 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -513,9 +513,9 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { int32_t udfdRenameUdfFile(SUdf *udf) { char newPath[PATH_MAX]; if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { - snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%"PRId64".so", tsTempDir, udf->name, udf->version, udf->lastFetchTime); + snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRId64 ".so", tsTempDir, udf->name, udf->version, udf->lastFetchTime); } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { - snprintf(newPath, PATH_MAX, "%s/%s_%d_%"PRId64".py", tsTempDir, udf->name, udf->version, udf->lastFetchTime); + snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRId64 ".py", tsTempDir, udf->name, udf->version, udf->lastFetchTime); } else { return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; } @@ -589,17 +589,18 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { uv_mutex_lock(&global.udfsMutex); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); int64_t currTime = taosGetTimestampSec(); - bool expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; - if (pUdfHash && !expired) { - ++(*pUdfHash)->refCount; - SUdf *udf = *pUdfHash; - uv_mutex_unlock(&global.udfsMutex); - return udf; - } - - if (pUdfHash && expired) { - (*pUdfHash)->expired = true; - taosHashRemove(global.udfsHash, udfName, strlen(udfName)); + bool expired = false; + if (pUdfHash) { + expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; + if (!expired) { + ++(*pUdfHash)->refCount; + SUdf *udf = *pUdfHash; + uv_mutex_unlock(&global.udfsMutex); + return udf; + } else { + (*pUdfHash)->expired = true; + taosHashRemove(global.udfsHash, udfName, strlen(udfName)); + } } SUdf *udf = udfdNewUdf(udfName); @@ -809,7 +810,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { return; } -int32_t udfdSaveFuncBodyToFile(SFuncInfo* pFuncInfo, SUdf* udf) { +int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; fnError("udfd create shared library failed since %s", terrstr(terrno)); From e71a82347be538125cae2f4ae42550e6dd7104a4 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 6 Apr 2023 14:31:11 +0800 Subject: [PATCH 07/29] enhance: refactor c plugin udf init --- source/libs/function/src/udfd.c | 68 ++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 0ab76b8e64..e25aade9f6 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -53,15 +53,7 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } int32_t udfdCPluginClose() { return 0; } -int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { - int32_t err = 0; - SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx)); - err = uv_dlopen(udf->path, &udfCtx->lib); - if (err != 0) { - fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); - return TSDB_CODE_UDF_LOAD_UDF_FAILURE; - } - const char *udfName = udf->name; +const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char* udfName) { char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; char *initSuffix = "_init"; strcpy(initFuncName, udfName); @@ -73,31 +65,53 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { strcpy(destroyFuncName, udfName); strncat(destroyFuncName, destroySuffix, strlen(destroySuffix)); uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)); + return udfName; +} + +void udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { + char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(processFuncName, udfName); + uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); + + char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *startSuffix = "_start"; + strncpy(startFuncName, processFuncName, sizeof(startFuncName)); + strncat(startFuncName, startSuffix, strlen(startSuffix)); + uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)); + + char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; + char *finishSuffix = "_finish"; + strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); + strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); + uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)); + + char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *mergeSuffix = "_merge"; + strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); + strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); + uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); +} + +int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { + int32_t err = 0; + SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx)); + err = uv_dlopen(udf->path, &udfCtx->lib); + if (err != 0) { + fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); + return TSDB_CODE_UDF_LOAD_UDF_FAILURE; + } + const char* udfName = udf->name; + + udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); if (udf->funcType == UDF_FUNC_TYPE_SCALAR) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(processFuncName, udfName); uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); } else if (udf->funcType == UDF_FUNC_TYPE_AGG) { - char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(processFuncName, udfName); - uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); - char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *startSuffix = "_start"; - strncpy(startFuncName, processFuncName, sizeof(startFuncName)); - strncat(startFuncName, startSuffix, strlen(startSuffix)); - uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)); - char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; - char *finishSuffix = "_finish"; - strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); - strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); - uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)); - char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *mergeSuffix = "_merge"; - strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); - strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix)); - uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); + udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName); } + int32_t code = 0; if (udfCtx->initFunc) { code = (udfCtx->initFunc)(); From 0b055470ec65eb15646ed32775ce4d07d340ce52 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 6 Apr 2023 14:42:16 +0800 Subject: [PATCH 08/29] fix: free pFuncVersions of SRetrieveFuncRsp --- source/common/src/tmsg.c | 1 + source/libs/function/src/udfd.c | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f7f1827807..2d2c6f5523 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1949,6 +1949,7 @@ void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) { tFreeSFuncInfo(pInfo); } taosArrayDestroy(pRsp->pFuncInfos); + taosArrayDestroy(pRsp->pFuncVersions); } int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index e25aade9f6..6d01768369 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -84,7 +84,7 @@ void udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) strncpy(finishFuncName, processFuncName, sizeof(finishFuncName)); strncat(finishFuncName, finishSuffix, strlen(finishSuffix)); uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)); - + char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *mergeSuffix = "_merge"; strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName)); From 4612d6d14b8d9ab4686e336efe4397414f861e55 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 6 Apr 2023 15:37:19 +0800 Subject: [PATCH 09/29] fix: catalog not processing version --- source/libs/qcom/src/querymsg.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index d2934e1ff8..4b41c24371 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -587,7 +587,8 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) { memcpy(output, funcInfo, sizeof(*funcInfo)); taosArrayDestroy(out.pFuncInfos); - + taosArrayDestroy(out.pFuncVersions); + return TSDB_CODE_SUCCESS; } From 66c86a60d50772fd34c6141e905df19419a5da8a Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 6 Apr 2023 15:58:53 +0800 Subject: [PATCH 10/29] fix: add func_version to systable --- source/common/src/systable.c | 1 + source/common/src/tmsg.c | 16 +++++++++++----- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/src/mndFunc.c | 11 +++++++---- 4 files changed, 20 insertions(+), 10 deletions(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 2c15980167..228aa40fa9 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -116,6 +116,7 @@ static const SSysDbTableSchema userFuncSchema[] = { {.name = "bufsize", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "func_language", .bytes = TSDB_TYPE_STR_MAX_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "func_body", .bytes = TSDB_MAX_BINARY_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "func_version", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, }; static const SSysDbTableSchema userIdxSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2d2c6f5523..f103b33138 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1921,11 +1921,17 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp pRsp->pFuncVersions = taosArrayInit(pRsp->numOfFuncs, sizeof(int32_t)); if (pRsp->pFuncVersions == NULL) return -1; - - for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { - int32_t version = 0; - if (tDecodeI32(&decoder, &version) < 0) return -1; - taosArrayPush(pRsp->pFuncVersions, &version); + if (tDecodeIsEnd(&decoder)) { + for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { + int32_t version = 0; + taosArrayPush(pRsp->pFuncVersions, &version); + } + } else { + for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { + int32_t version = 0; + if (tDecodeI32(&decoder, &version) < 0) return -1; + taosArrayPush(pRsp->pFuncVersions, &version); + } } tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c22e5422b6..1ff1165e21 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -447,7 +447,7 @@ typedef struct { int32_t codeSize; char* pComment; char* pCode; - int32_t funcVersions; + int32_t funcVersion; } SFuncObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index e707af2694..08336ef70a 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -83,7 +83,7 @@ static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) { SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER) } SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER) - SDB_SET_INT32(pRaw, dataPos, pFunc->funcVersions, _OVER) + SDB_SET_INT32(pRaw, dataPos, pFunc->funcVersion, _OVER) SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER); @@ -147,7 +147,7 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER) if(sver >= 2){ - SDB_GET_INT32(pRaw, dataPos, &pFunc->funcVersions, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pFunc->funcVersion, _OVER) } SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) @@ -234,7 +234,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre if(oldFunc == NULL){ goto _OVER; } - func.funcVersions = oldFunc->funcVersions + 1; + func.funcVersion = oldFunc->funcVersion + 1; mndReleaseFunc(pMnode, oldFunc); } @@ -473,7 +473,7 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { } taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo); - taosArrayPush(retrieveRsp.pFuncVersions, &pFunc->funcVersions); + taosArrayPush(retrieveRsp.pFuncVersions, &pFunc->funcVersion); mndReleaseFunc(pMnode, pFunc); } @@ -590,6 +590,9 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl colDataSetVal(pColInfo, numOfRows, (const char*)b4, false); taosMemoryFree(b4); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pFunc->funcVersion, false); + numOfRows++; sdbRelease(pSdb, pFunc); } From 455719befa3efee653815e7d218d69c75016533e Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 6 Apr 2023 18:09:29 +0800 Subject: [PATCH 11/29] fix: use us time unit for last fetch time --- source/libs/function/src/udfd.c | 4 ++-- tests/system-test/2-query/odbc.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 6d01768369..3e03e1acb6 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -605,7 +605,7 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { int64_t currTime = taosGetTimestampSec(); bool expired = false; if (pUdfHash) { - expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; + expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000 * 1000; // 10s if (!expired) { ++(*pUdfHash)->refCount; SUdf *udf = *pUdfHash; @@ -903,7 +903,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf); if (msgInfo->code == 0) { - udf->lastFetchTime = taosGetTimestampMs(); + udf->lastFetchTime = taosGetTimestampUs(); } tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); diff --git a/tests/system-test/2-query/odbc.py b/tests/system-test/2-query/odbc.py index f9232dddf8..9ff4a26ac0 100644 --- a/tests/system-test/2-query/odbc.py +++ b/tests/system-test/2-query/odbc.py @@ -22,7 +22,7 @@ class TDTestCase: tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)") tdSql.query("select count(*) from information_schema.ins_columns") - tdSql.checkData(0, 0, 274) + tdSql.checkData(0, 0, 275) tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'") tdSql.checkRows(14) From 596764479f6d7aebf83d90b22da78916fd719993 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 7 Apr 2023 10:30:36 +0800 Subject: [PATCH 12/29] fix: no more fetch when it is already fetching --- source/libs/catalog/test/catalogTests.cpp | 5 ++++- source/libs/function/src/udfd.c | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 5e543384ac..f3f7d62acf 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -672,12 +672,15 @@ void ctgTestRspUdfInfo(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pR SRetrieveFuncRsp funcRsp = {0}; funcRsp.numOfFuncs = 1; funcRsp.pFuncInfos = taosArrayInit(1, sizeof(SFuncInfo)); + funcRsp.pFuncVersions = taosArrayInit(1, sizeof(int32_t)); SFuncInfo funcInfo = {0}; strcpy(funcInfo.name, "func1"); funcInfo.funcType = ctgTestFuncType; (void)taosArrayPush(funcRsp.pFuncInfos, &funcInfo); - + int32_t version = 0; + (void)taosArrayPush(funcRsp.pFuncVersions, &version); + int32_t rspLen = tSerializeSRetrieveFuncRsp(NULL, 0, &funcRsp); void *pReq = rpcMallocCont(rspLen); tSerializeSRetrieveFuncRsp(pReq, rspLen, &funcRsp); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 3e03e1acb6..fbf523ac4f 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -581,6 +581,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { SUdf *udfdNewUdf(const char *udfName) { SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); udfNew->refCount = 1; + udfNew->lastFetchTime = taosGetTimestampUs(); strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); udfNew->state = UDF_STATE_INIT; @@ -618,6 +619,7 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { } SUdf *udf = udfdNewUdf(udfName); + SUdf **pUdf = &udf; taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); uv_mutex_unlock(&global.udfsMutex); @@ -902,8 +904,8 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->version = *(int32_t *)taosArrayGet(retrieveRsp.pFuncVersions, 0); msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf); - if (msgInfo->code == 0) { - udf->lastFetchTime = taosGetTimestampUs(); + if (msgInfo->code != 0) { + udf->lastFetchTime = 0; } tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); From 26f1e91ddb6f0b8749b1002c73c8c695493c9c1c Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 7 Apr 2023 16:12:29 +0800 Subject: [PATCH 13/29] fix: change binarylib/script dir to tsdatadir --- source/libs/function/src/udfd.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index fbf523ac4f..836d2d893c 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -387,12 +387,12 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { } if (plugin->openFunc) { - int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsTempDir) + 1 + 1; // tsTempDir:tsUdfdLdLibPath + int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsDataDir) + 1 + 1; // tsDataDir:tsUdfdLdLibPath char *pythonPath = taosMemoryMalloc(lenPythonPath); #ifdef WINDOWS - snprintf(pythonPath, lenPythonPath, "%s;%s", tsTempDir, tsUdfdLdLibPath); + snprintf(pythonPath, lenPythonPath, "%s;%s", tsDataDir, tsUdfdLdLibPath); #else - snprintf(pythonPath, lenPythonPath, "%s:%s", tsTempDir, tsUdfdLdLibPath); + snprintf(pythonPath, lenPythonPath, "%s:%s", tsDataDir, tsUdfdLdLibPath); #endif SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}}; err = plugin->openFunc(items, 2); @@ -527,9 +527,9 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { int32_t udfdRenameUdfFile(SUdf *udf) { char newPath[PATH_MAX]; if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { - snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRId64 ".so", tsTempDir, udf->name, udf->version, udf->lastFetchTime); + snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->lastFetchTime); } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { - snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRId64 ".py", tsTempDir, udf->name, udf->version, udf->lastFetchTime); + snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->lastFetchTime); } else { return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; } @@ -827,7 +827,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { - if (!osTempSpaceAvailable()) { + if (!osDataSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; fnError("udfd create shared library failed since %s", terrstr(terrno)); return terrno; @@ -835,9 +835,9 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { char path[PATH_MAX] = {0}; #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s_%d_%" PRId64, tsTempDir, pFuncInfo->name, udf->version, udf->lastFetchTime); + snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->lastFetchTime); #else - snprintf(path, sizeof(path), "%s/%s_%d_%" PRId64, tsTempDir, pFuncInfo->name, udf->version, udf->lastFetchTime); + snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->lastFetchTime); #endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { From 32523d72f9c6a7037c19802387942779096261e5 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sat, 8 Apr 2023 10:11:57 +0800 Subject: [PATCH 14/29] fix: add created time to file name and keep file reuse --- include/common/tmsg.h | 7 +++- include/libs/function/taosudf.h | 1 + source/common/src/tmsg.c | 24 ++++++------ source/dnode/mnode/impl/src/mndFunc.c | 10 +++-- source/libs/catalog/test/catalogTests.cpp | 6 +-- source/libs/function/src/udfd.c | 45 ++++++++++++++++------- source/libs/qcom/src/querymsg.c | 2 +- 7 files changed, 61 insertions(+), 34 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0ef6347a44..eefb8fc99e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1093,10 +1093,15 @@ typedef struct { char* pCode; } SFuncInfo; +typedef struct { + int32_t funcVersion; + int64_t funcCreatedTime; +} SFuncExtraInfo; + typedef struct { int32_t numOfFuncs; SArray* pFuncInfos; - SArray* pFuncVersions; + SArray* pFuncExtraInfos; } SRetrieveFuncRsp; int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp); diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 2eccb6225c..5703df87fa 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -277,6 +277,7 @@ typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EU typedef struct SScriptUdfInfo { const char *name; int32_t version; + int64_t createdTime; EUdfFuncType funcType; int8_t scriptType; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index f103b33138..ac3d4e6a10 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1865,10 +1865,11 @@ int32_t tSerializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp * } } - if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncVersions)) return -1; + if (pRsp->numOfFuncs != (int32_t)taosArrayGetSize(pRsp->pFuncExtraInfos)) return -1; for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { - int32_t version = *(int32_t*)taosArrayGet(pRsp->pFuncVersions, i); - if (tEncodeI32(&encoder, version) < 0) return -1; + SFuncExtraInfo *extraInfo = taosArrayGet(pRsp->pFuncExtraInfos, i); + if (tEncodeI32(&encoder, extraInfo->funcVersion) < 0) return -1; + if (tEncodeI64(&encoder, extraInfo->funcCreatedTime) < 0) return -1; } tEndEncode(&encoder); @@ -1919,18 +1920,19 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp taosArrayPush(pRsp->pFuncInfos, &fInfo); } - pRsp->pFuncVersions = taosArrayInit(pRsp->numOfFuncs, sizeof(int32_t)); - if (pRsp->pFuncVersions == NULL) return -1; + pRsp->pFuncExtraInfos = taosArrayInit(pRsp->numOfFuncs, sizeof(SFuncExtraInfo)); + if (pRsp->pFuncExtraInfos == NULL) return -1; if (tDecodeIsEnd(&decoder)) { for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { - int32_t version = 0; - taosArrayPush(pRsp->pFuncVersions, &version); + SFuncExtraInfo extraInfo = { 0 }; + taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo); } } else { for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) { - int32_t version = 0; - if (tDecodeI32(&decoder, &version) < 0) return -1; - taosArrayPush(pRsp->pFuncVersions, &version); + SFuncExtraInfo extraInfo = { 0 }; + if (tDecodeI32(&decoder, &extraInfo.funcVersion) < 0) return -1; + if (tDecodeI64(&decoder, &extraInfo.funcCreatedTime) < 0) return -1; + taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo); } } tEndDecode(&decoder); @@ -1955,7 +1957,7 @@ void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) { tFreeSFuncInfo(pInfo); } taosArrayDestroy(pRsp->pFuncInfos); - taosArrayDestroy(pRsp->pFuncVersions); + taosArrayDestroy(pRsp->pFuncExtraInfos); } int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) { diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index 08336ef70a..a4ccc0ad73 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -428,8 +428,8 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { goto RETRIEVE_FUNC_OVER; } - retrieveRsp.pFuncVersions = taosArrayInit(retrieveReq.numOfFuncs, sizeof(int32_t)); - if (retrieveRsp.pFuncVersions == NULL) { + retrieveRsp.pFuncExtraInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncExtraInfo)); + if (retrieveRsp.pFuncExtraInfos == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto RETRIEVE_FUNC_OVER; } @@ -472,8 +472,10 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { } } taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo); - - taosArrayPush(retrieveRsp.pFuncVersions, &pFunc->funcVersion); + SFuncExtraInfo extraInfo = {0}; + extraInfo.funcVersion = pFunc->funcVersion; + extraInfo.funcCreatedTime = pFunc->createdTime; + taosArrayPush(retrieveRsp.pFuncExtraInfos, &pFunc->funcVersion); mndReleaseFunc(pMnode, pFunc); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index f3f7d62acf..c4bd1df2d6 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -672,14 +672,14 @@ void ctgTestRspUdfInfo(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pR SRetrieveFuncRsp funcRsp = {0}; funcRsp.numOfFuncs = 1; funcRsp.pFuncInfos = taosArrayInit(1, sizeof(SFuncInfo)); - funcRsp.pFuncVersions = taosArrayInit(1, sizeof(int32_t)); + funcRsp.pFuncExtraInfos = taosArrayInit(1, sizeof(SFuncExtraInfo)); SFuncInfo funcInfo = {0}; strcpy(funcInfo.name, "func1"); funcInfo.funcType = ctgTestFuncType; (void)taosArrayPush(funcRsp.pFuncInfos, &funcInfo); - int32_t version = 0; - (void)taosArrayPush(funcRsp.pFuncVersions, &version); + SFuncExtraInfo extraInfo = {.funcVersion = 1, .funcCreatedTime = taosGetTimestampMs()}; + (void)taosArrayPush(funcRsp.pFuncExtraInfos, &extraInfo); int32_t rspLen = tSerializeSRetrieveFuncRsp(NULL, 0, &funcRsp); void *pReq = rpcMallocCont(rspLen); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 836d2d893c..4ef7a0e7cc 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -53,9 +53,9 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } int32_t udfdCPluginClose() { return 0; } -const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char* udfName) { - char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *initSuffix = "_init"; +const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { + char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; + char *initSuffix = "_init"; strcpy(initFuncName, udfName); strncat(initFuncName, initSuffix, strlen(initSuffix)); uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)); @@ -100,7 +100,7 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); return TSDB_CODE_UDF_LOAD_UDF_FAILURE; } - const char* udfName = udf->name; + const char *udfName = udf->name; udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); @@ -260,6 +260,7 @@ typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY } EUdfStat typedef struct SUdf { char name[TSDB_FUNC_NAME_LEN + 1]; int32_t version; + int64_t createdTime; int8_t funcType; int8_t scriptType; @@ -518,6 +519,7 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { } udfInfo->name = udf->name; udfInfo->version = udf->version; + udfInfo->createdTime = udf->createdTime; udfInfo->outputLen = udf->outputLen; udfInfo->outputType = udf->outputType; udfInfo->path = udf->path; @@ -527,9 +529,9 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { int32_t udfdRenameUdfFile(SUdf *udf) { char newPath[PATH_MAX]; if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { - snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->lastFetchTime); + snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->createdTime); } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { - snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->lastFetchTime); + snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); } else { return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; } @@ -606,7 +608,7 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { int64_t currTime = taosGetTimestampSec(); bool expired = false; if (pUdfHash) { - expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000 * 1000; // 10s + expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000 * 1000; // 10s if (!expired) { ++(*pUdfHash)->refCount; SUdf *udf = *pUdfHash; @@ -618,7 +620,7 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { } } - SUdf *udf = udfdNewUdf(udfName); + SUdf *udf = udfdNewUdf(udfName); SUdf **pUdf = &udf; taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); @@ -805,7 +807,6 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { uv_mutex_destroy(&udf->lock); code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); fnDebug("udfd destroy function returns %d", code); - taosRemoveFile(udf->path); taosMemoryFree(udf); } taosMemoryFree(handle); @@ -835,22 +836,36 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { char path[PATH_MAX] = {0}; #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->lastFetchTime); + snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->createdTime); #else - snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->lastFetchTime); + snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->createdTime); #endif + + bool fileExist = !(taosStatFile(path, NULL, NULL) < 0); + if (fileExist) { + // TODO: error processing + TdFilePtr file = taosOpenFile(path, TD_FILE_READ); + int64_t size = 0; + taosFStatFile(file, &size, NULL); + taosCloseFile(file); + if (size == pFuncInfo->codeSize) { + strncpy(udf->path, path, PATH_MAX); + return TSDB_CODE_SUCCESS; + } + } + TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (file == NULL) { fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno)); return TSDB_CODE_FILE_CORRUPTED; } - int64_t count = taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize); if (count != pFuncInfo->codeSize) { fnError("udfd write udf shared library failed"); return TSDB_CODE_FILE_CORRUPTED; } taosCloseFile(&file); + strncpy(udf->path, path, PATH_MAX); return TSDB_CODE_SUCCESS; } @@ -901,15 +916,17 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - udf->version = *(int32_t *)taosArrayGet(retrieveRsp.pFuncVersions, 0); + SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0); + udf->version = pFuncExtraInfo->funcVersion; + udf->createdTime = pFuncExtraInfo->funcCreatedTime; msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf); if (msgInfo->code != 0) { udf->lastFetchTime = 0; } tFreeSFuncInfo(pFuncInfo); taosArrayDestroy(retrieveRsp.pFuncInfos); - taosArrayDestroy(retrieveRsp.pFuncVersions); + taosArrayDestroy(retrieveRsp.pFuncExtraInfos); } _return: diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 4b41c24371..b62a3e4932 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -587,7 +587,7 @@ int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) { memcpy(output, funcInfo, sizeof(*funcInfo)); taosArrayDestroy(out.pFuncInfos); - taosArrayDestroy(out.pFuncVersions); + taosArrayDestroy(out.pFuncExtraInfos); return TSDB_CODE_SUCCESS; } From 0be6bb735a193800fad26d3319b5cb61e3185c9b Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sat, 8 Apr 2023 10:42:27 +0800 Subject: [PATCH 15/29] fix: remove rename file --- source/libs/function/src/udfd.c | 52 +++++++++++++++++---------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 4ef7a0e7cc..2990f81184 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -336,6 +336,7 @@ static int32_t udfdRun(); static void udfdConnectMnodeThreadFunc(void *args); SUdf *udfdNewUdf(const char *udfName); +void udfdGetFuncBodyPath(const SUdf *udf, const char *path); void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->openFunc = udfdCPluginOpen; @@ -526,22 +527,6 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { udfInfo->scriptType = udf->scriptType; } -int32_t udfdRenameUdfFile(SUdf *udf) { - char newPath[PATH_MAX]; - if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { - snprintf(newPath, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->createdTime); - } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { - snprintf(newPath, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); - } else { - return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED; - } - int32_t code = taosRenameFile(udf->path, newPath); - if (code == 0) { - sprintf(udf->path, "%s", newPath); - } - return 0; -} - int32_t udfdInitUdf(char *udfName, SUdf *udf) { int32_t err = 0; err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf); @@ -565,9 +550,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { } uv_mutex_unlock(&global.scriptPluginsMutex); udf->scriptPlugin = global.scriptPlugins[udf->scriptType]; - - udfdRenameUdfFile(udf); - + SScriptUdfInfo info = {0}; convertUdf2UdfInfo(udf, &info); err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); @@ -827,6 +810,30 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { return; } + +void udfdGetFuncBodyPath(const SUdf *udf, const char *path) { + if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { +#ifdef WINDOWS + snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64 ".dll", tsDataDir, udf->name, udf->version, udf->createdTime); +#else + snprintf(path, sizeof(path), "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, + udf->createdTime); +#endif + } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { +#ifdef WINDOWS + snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); +#else + snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); +#endif + } else { +#ifdef WINDOWS + snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); +#else + snprintf(path, sizeof(path), "%s/lib%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); +#endif + } +} + int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { if (!osDataSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; @@ -835,12 +842,7 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { } char path[PATH_MAX] = {0}; -#ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->createdTime); -#else - snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64, tsDataDir, pFuncInfo->name, udf->version, udf->createdTime); -#endif - + udfdGetFuncBodyPath(udf, path); bool fileExist = !(taosStatFile(path, NULL, NULL) < 0); if (fileExist) { // TODO: error processing From 55ffb0bc6de1d7211ab068ca15bb916f8495effc Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 8 Apr 2023 11:36:09 +0800 Subject: [PATCH 16/29] fix: correct udf body file name and correct createdtime --- source/dnode/mnode/impl/src/mndFunc.c | 2 +- source/libs/function/src/udfd.c | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index a4ccc0ad73..dcbcb5ea05 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -475,7 +475,7 @@ static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) { SFuncExtraInfo extraInfo = {0}; extraInfo.funcVersion = pFunc->funcVersion; extraInfo.funcCreatedTime = pFunc->createdTime; - taosArrayPush(retrieveRsp.pFuncExtraInfos, &pFunc->funcVersion); + taosArrayPush(retrieveRsp.pFuncExtraInfos, &extraInfo); mndReleaseFunc(pMnode, pFunc); } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 2990f81184..85600f1211 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -336,7 +336,8 @@ static int32_t udfdRun(); static void udfdConnectMnodeThreadFunc(void *args); SUdf *udfdNewUdf(const char *udfName); -void udfdGetFuncBodyPath(const SUdf *udf, const char *path); +void udfdGetFuncBodyPath(const SUdf *udf, char *path); + void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->openFunc = udfdCPluginOpen; @@ -811,25 +812,25 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { } -void udfdGetFuncBodyPath(const SUdf *udf, const char *path) { +void udfdGetFuncBodyPath(const SUdf *udf, char *path) { if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64 ".dll", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", tsDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, sizeof(path), "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, + snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->createdTime); #endif } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, sizeof(path), "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); #endif } else { #ifdef WINDOWS - snprintf(path, sizeof(path), "%s%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, sizeof(path), "%s/lib%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); #endif } } @@ -849,7 +850,7 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { TdFilePtr file = taosOpenFile(path, TD_FILE_READ); int64_t size = 0; taosFStatFile(file, &size, NULL); - taosCloseFile(file); + taosCloseFile(&file); if (size == pFuncInfo->codeSize) { strncpy(udf->path, path, PATH_MAX); return TSDB_CODE_SUCCESS; @@ -918,8 +919,8 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; - SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0); + SFuncExtraInfo *pFuncExtraInfo = (SFuncExtraInfo *)taosArrayGet(retrieveRsp.pFuncExtraInfos, 0); udf->version = pFuncExtraInfo->funcVersion; udf->createdTime = pFuncExtraInfo->funcCreatedTime; msgInfo->code = udfdSaveFuncBodyToFile(pFuncInfo, udf); @@ -1478,7 +1479,7 @@ int main(int argc, char *argv[]) { uv_thread_t mnodeConnectThread; uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); - + udfdRun(); removeListeningPipe(); From dc86bf9671c8ae71422b15faf228f0f53338863c Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 8 Apr 2023 13:26:15 +0800 Subject: [PATCH 17/29] fix: remove code size checking --- source/libs/function/src/udfd.c | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 85600f1211..70631888ca 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -338,7 +338,7 @@ static void udfdConnectMnodeThreadFunc(void *args); SUdf *udfdNewUdf(const char *udfName); void udfdGetFuncBodyPath(const SUdf *udf, char *path); -void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { +void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; @@ -551,7 +551,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { } uv_mutex_unlock(&global.scriptPluginsMutex); udf->scriptPlugin = global.scriptPlugins[udf->scriptType]; - + SScriptUdfInfo info = {0}; convertUdf2UdfInfo(udf, &info); err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx); @@ -811,14 +811,12 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { return; } - void udfdGetFuncBodyPath(const SUdf *udf, char *path) { if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { #ifdef WINDOWS snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", tsDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, - udf->createdTime); + snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->createdTime); #endif } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { #ifdef WINDOWS @@ -846,15 +844,8 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { udfdGetFuncBodyPath(udf, path); bool fileExist = !(taosStatFile(path, NULL, NULL) < 0); if (fileExist) { - // TODO: error processing - TdFilePtr file = taosOpenFile(path, TD_FILE_READ); - int64_t size = 0; - taosFStatFile(file, &size, NULL); - taosCloseFile(&file); - if (size == pFuncInfo->codeSize) { - strncpy(udf->path, path, PATH_MAX); - return TSDB_CODE_SUCCESS; - } + strncpy(udf->path, path, PATH_MAX); + return TSDB_CODE_SUCCESS; } TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); @@ -1479,7 +1470,7 @@ int main(int argc, char *argv[]) { uv_thread_t mnodeConnectThread; uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); - + udfdRun(); removeListeningPipe(); From b725c9e192bb8451f128ef985589fb4483b69431 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 9 Apr 2023 20:31:25 +0800 Subject: [PATCH 18/29] enhance: change udf func body dir to tsDataDir/.udf --- source/libs/function/src/udfd.c | 58 +++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 13 deletions(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 70631888ca..aa72309c62 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -229,6 +229,7 @@ typedef struct SUdfdContext { SArray *residentFuncs; + char udfDataDir[PATH_MAX]; bool printVersion; } SUdfdContext; @@ -390,12 +391,13 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { } if (plugin->openFunc) { - int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsDataDir) + 1 + 1; // tsDataDir:tsUdfdLdLibPath - char *pythonPath = taosMemoryMalloc(lenPythonPath); + int16_t lenPythonPath = + strlen(tsUdfdLdLibPath) + strlen(global.udfDataDir) + 1 + 1; // global.udfDataDir:tsUdfdLdLibPath + char *pythonPath = taosMemoryMalloc(lenPythonPath); #ifdef WINDOWS - snprintf(pythonPath, lenPythonPath, "%s;%s", tsDataDir, tsUdfdLdLibPath); + snprintf(pythonPath, lenPythonPath, "%s;%s", global.udfDataDir, tsUdfdLdLibPath); #else - snprintf(pythonPath, lenPythonPath, "%s:%s", tsDataDir, tsUdfdLdLibPath); + snprintf(pythonPath, lenPythonPath, "%s:%s", global.udfDataDir, tsUdfdLdLibPath); #endif SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}}; err = plugin->openFunc(items, 2); @@ -567,7 +569,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { SUdf *udfdNewUdf(const char *udfName) { SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); udfNew->refCount = 1; - udfNew->lastFetchTime = taosGetTimestampUs(); + udfNew->lastFetchTime = taosGetTimestampMs(); strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); udfNew->state = UDF_STATE_INIT; @@ -592,15 +594,19 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { int64_t currTime = taosGetTimestampSec(); bool expired = false; if (pUdfHash) { - expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000 * 1000; // 10s + expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s if (!expired) { ++(*pUdfHash)->refCount; SUdf *udf = *pUdfHash; uv_mutex_unlock(&global.udfsMutex); + fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, udf->name, udf->version, + udf->createdTime); return udf; } else { (*pUdfHash)->expired = true; taosHashRemove(global.udfsHash, udfName, strlen(udfName)); + fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64, + (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime); } } @@ -814,21 +820,22 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { void udfdGetFuncBodyPath(const SUdf *udf, char *path) { if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { #ifdef WINDOWS - snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64 ".so", global.udfDataDir, udf->name, udf->version, + udf->createdTime); #endif } else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { #ifdef WINDOWS - snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s/%s_%d_%" PRIx64 ".py", global.udfDataDir, udf->name, udf->version, udf->createdTime); #endif } else { #ifdef WINDOWS - snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime); #else - snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, tsDataDir, udf->name, udf->version, udf->createdTime); + snprintf(path, PATH_MAX, "%s/lib%s_%d_%" PRIx64, global.udfDataDir, udf->name, udf->version, udf->createdTime); #endif } } @@ -845,6 +852,7 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { bool fileExist = !(taosStatFile(path, NULL, NULL) < 0); if (fileExist) { strncpy(udf->path, path, PATH_MAX); + fnInfo("udfd func body file. reuse existing file %s", path); return TSDB_CODE_SUCCESS; } @@ -1429,6 +1437,24 @@ int32_t udfdCleanup() { return 0; } +int32_t udfdCreateUdfSourceDir() { + snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsDataDir); + int32_t code = taosMkDir(global.udfDataDir); + if (code != TSDB_CODE_SUCCESS) { + snprintf(global.udfDataDir, PATH_MAX, "%s/.udf", tsTempDir); + code = taosMkDir(global.udfDataDir); + } + fnInfo("udfd create udf source directory %s. result: %s", global.udfDataDir, tstrerror(code)); + + return code; +} + +int32_t udfdDestroyUdfSourceDir() { + fnInfo("destory udf source directory %s", global.udfDataDir); + taosRemoveDir(global.udfDataDir); + return 0; +} + int main(int argc, char *argv[]) { if (!taosCheckSystemIsLittleEnd()) { printf("failed to start since on non-little-end machines\n"); @@ -1457,10 +1483,15 @@ int main(int argc, char *argv[]) { initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); if (udfdOpenClientRpc() != 0) { - fnError("open rpc connection to mnode failure"); + fnError("open rpc connection to mnode failed"); return -3; } + if (udfdCreateUdfSourceDir() != 0) { + fnError("create udf source directory failed"); + return -4; + } + if (udfdUvInit() != 0) { fnError("uv init failure"); return -5; @@ -1474,6 +1505,7 @@ int main(int argc, char *argv[]) { udfdRun(); removeListeningPipe(); + udfdDestroyUdfSourceDir(); udfdCloseClientRpc(); udfdDeinitResidentFuncs(); From 65f117822cf43fd927695ab9b5d323a8daaa12ed Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 10 Apr 2023 06:45:26 +0800 Subject: [PATCH 19/29] fix: windows ci error --- source/libs/catalog/test/catalogTests.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index c4bd1df2d6..fd925a8fe5 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -678,7 +678,9 @@ void ctgTestRspUdfInfo(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pR funcInfo.funcType = ctgTestFuncType; (void)taosArrayPush(funcRsp.pFuncInfos, &funcInfo); - SFuncExtraInfo extraInfo = {.funcVersion = 1, .funcCreatedTime = taosGetTimestampMs()}; + SFuncExtraInfo extraInfo = {0}; + extraInfo.funcVersion = 0; + extraInfo.funcCreatedTime = taosGetTimestampMs(); (void)taosArrayPush(funcRsp.pFuncExtraInfos, &extraInfo); int32_t rspLen = tSerializeSRetrieveFuncRsp(NULL, 0, &funcRsp); From b2de711a849e812695c919148d0b3451678b06f8 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 10 Apr 2023 11:33:25 +0800 Subject: [PATCH 20/29] fix: change orReplace logic in mnode --- source/dnode/mnode/impl/src/mndFunc.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index dcbcb5ea05..f0104194d8 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -231,11 +231,10 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre if(pCreate->orReplace == 1){ SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name); - if(oldFunc == NULL){ - goto _OVER; + if(oldFunc != NULL){ + func.funcVersion = oldFunc->funcVersion + 1; + mndReleaseFunc(pMnode, oldFunc); } - func.funcVersion = oldFunc->funcVersion + 1; - mndReleaseFunc(pMnode, oldFunc); } pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func"); @@ -319,6 +318,9 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) { mInfo("func:%s, already exist, ignore exist is set", createReq.name); code = 0; goto _OVER; + } else if (createReq.orReplace) { + mInfo("func:%s, replace function is set", createReq.name); + code = 0; } else { terrno = TSDB_CODE_MND_FUNC_ALREADY_EXIST; goto _OVER; From 765c09d59e1b8837c66b3f36a02a07bb500e33f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Chappyguoxy=E2=80=9D?= <“happy_guoxy@163.com”> Date: Mon, 10 Apr 2023 16:40:16 +0800 Subject: [PATCH 21/29] update function --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndFunc.c | 89 +++++++++++++++++++++------ 2 files changed, 72 insertions(+), 18 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 1ff1165e21..017d77d1c0 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -448,6 +448,7 @@ typedef struct { char* pComment; char* pCode; int32_t funcVersion; + SRWLatch lock; } SFuncObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index f0104194d8..5042e6543f 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -152,6 +152,8 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER) + taosInitRWLatch(&pFunc->lock); + terrno = 0; _OVER: @@ -179,6 +181,44 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) { static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) { mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); + + taosWLockLatch(&pOld->lock); + + pOld->align = pNew->align; + pOld->bufSize = pNew->bufSize; + pOld->codeSize = pNew->codeSize; + pOld->commentSize = pNew->commentSize; + pOld->createdTime = pNew->createdTime; + pOld->funcType = pNew->funcType; + pOld->funcVersion = pNew->funcVersion; + pOld->outputLen = pNew->outputLen; + pOld->outputType = pNew->outputType; + + if(pOld->pComment != NULL){ + taosMemoryFree(pOld->pComment); + pOld->pComment = NULL; + } + if(pNew->commentSize > 0 && pNew->pComment != NULL){ + pOld->commentSize = pNew->commentSize; + pOld->pComment = taosMemoryMalloc(pOld->commentSize); + memcpy(pOld->pComment, pNew->pComment, pOld->commentSize); + } + + if(pOld->pCode != NULL){ + taosMemoryFree(pOld->pCode); + pOld->pCode = NULL; + } + if(pNew->codeSize > 0 && pNew->pCode != NULL){ + pOld->codeSize = pNew->codeSize; + pOld->pCode = taosMemoryMalloc(pOld->codeSize); + memcpy(pOld->pCode, pNew->pCode, pOld->codeSize); + } + + pOld->scriptType = pNew->scriptType; + pOld->signature = pNew->signature; + + taosWUnLockLatch(&pOld->lock); + return 0; } @@ -229,36 +269,49 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre } memcpy(func.pCode, pCreate->pCode, func.codeSize); - if(pCreate->orReplace == 1){ - SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name); - if(oldFunc != NULL){ - func.funcVersion = oldFunc->funcVersion + 1; - mndReleaseFunc(pMnode, oldFunc); - } - } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func"); if (pTrans == NULL) goto _OVER; - mInfo("trans:%d, used to create func:%s", pTrans->id, pCreate->name); - SSdbRaw *pRedoRaw = mndFuncActionEncode(&func); - if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto _OVER; + SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name); + if(pCreate->orReplace == 1 && oldFunc != NULL){ + func.funcVersion = oldFunc->funcVersion + 1; - SSdbRaw *pUndoRaw = mndFuncActionEncode(&func); - if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto _OVER; + SSdbRaw *pRedoRaw = mndFuncActionEncode(oldFunc); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) goto _OVER; - SSdbRaw *pCommitRaw = mndFuncActionEncode(&func); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER; + SSdbRaw *pUndoRaw = mndFuncActionEncode(oldFunc); + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY) != 0) goto _OVER; + + SSdbRaw *pCommitRaw = mndFuncActionEncode(&func); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER; + } + else{ + SSdbRaw *pRedoRaw = mndFuncActionEncode(&func); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto _OVER; + + SSdbRaw *pUndoRaw = mndFuncActionEncode(&func); + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) goto _OVER; + + SSdbRaw *pCommitRaw = mndFuncActionEncode(&func); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER; + } if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; _OVER: + if(oldFunc != NULL){ + mndReleaseFunc(pMnode, oldFunc); + } + taosMemoryFree(func.pCode); taosMemoryFree(func.pComment); mndTransDrop(pTrans); From 3078e2d487fdc85302ca50463b171e18e012c059 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 10 Apr 2023 18:35:35 +0800 Subject: [PATCH 22/29] feat(query): INTERP support boolean type --- source/libs/executor/src/tfill.c | 9 +++++-- source/libs/executor/src/timesliceoperator.c | 26 +++++++++++++++++--- source/libs/function/src/builtins.c | 2 +- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index f4c8521b94..520ec3a72a 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -44,7 +44,7 @@ static void setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo, } else { p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->prev : &pFillInfo->next; } - + SGroupKeys* pKey = taosArrayGet(p->pRowVal, colIdx); doSetVal(pDstColInfo, rowIndex, pKey); } @@ -578,7 +578,12 @@ int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* GET_TYPED_DATA(v1, double, inputType, point1->val); GET_TYPED_DATA(v2, double, inputType, point2->val); - double r = DO_INTERPOLATION(v1, v2, point1->key, point2->key, point->key); + double r = 0; + if (!IS_BOOLEAN_TYPE(inputType)) { + r = DO_INTERPOLATION(v1, v2, point1->key, point2->key, point->key); + } else { + r = (v1 < 1 || v2 < 1) ? 0 : 1; + } SET_TYPED_DATA(point->val, outputType, r); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 911700be85..02b95f6b5c 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -156,6 +156,16 @@ static FORCE_INLINE int32_t timeSliceEnsureBlockCapacity(STimeSliceOperatorInfo* return TSDB_CODE_SUCCESS; } +static bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) { + char *name = pExprInfo->pExpr->_function.functionName; + return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts") == 0); +} + +static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) { + char *name = pExprInfo->pExpr->_function.functionName; + return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0); +} + static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, bool beforeTs) { int32_t rows = pResBlock->info.rows; @@ -170,10 +180,10 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp int32_t dstSlot = pExprInfo->base.resSchema.slotId; SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); - if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { + if (isIrowtsPseudoColumn(pExprInfo)) { colDataSetVal(pDst, rows, (char*)&pSliceInfo->current, false); continue; - } else if (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type)) { + } else if (isIsfilledPseudoColumn(pExprInfo)) { bool isFilled = true; colDataAppend(pDst, pResBlock->info.rows, (char*)&isFilled, false); continue; @@ -203,6 +213,14 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp int64_t v = 0; GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); colDataSetVal(pDst, rows, (char*)&v, false); + } else if (IS_BOOLEAN_TYPE(pDst->info.type)) { + bool v = false; + if (!IS_VAR_DATA_TYPE(pVar->nType)) { + GET_TYPED_DATA(v, bool, pVar->nType, &pVar->i); + } else { + v = taosStr2Int8(varDataVal(pVar->pz), NULL, 10); + } + colDataSetVal(pDst, rows, (char*)&v, false); } break; } @@ -288,9 +306,9 @@ static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* int32_t dstSlot = pExprInfo->base.resSchema.slotId; SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); - if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { + if (isIrowtsPseudoColumn(pExprInfo)) { colDataSetVal(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); - } else if (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type)) { + } else if (isIsfilledPseudoColumn(pExprInfo)) { bool isFilled = false; colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false); } else { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a36204bab7..69951f680e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1575,7 +1575,7 @@ static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) uint8_t nodeType = nodeType(nodesListGetNode(pFunc->pParameterList, 0)); uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - if (!IS_NUMERIC_TYPE(paraType) || QUERY_NODE_VALUE == nodeType) { + if ((!IS_NUMERIC_TYPE(paraType) && !IS_BOOLEAN_TYPE(paraType))|| QUERY_NODE_VALUE == nodeType) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } From 89e4b15d8e7b8a61d13030026cf630d98ce65400 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 10 Apr 2023 18:36:15 +0800 Subject: [PATCH 23/29] add test cases --- tests/system-test/2-query/interp.py | 361 +++++++++++++++++++++++++++- 1 file changed, 359 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index d7344c631f..51d907b13a 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -19,6 +19,7 @@ class TDTestCase: dbname = "db" tbname = "tb" tbname1 = "tb1" + tbname2 = "tb2" stbname = "stb" ctbname1 = "ctb1" ctbname2 = "ctb2" @@ -1948,6 +1949,7 @@ class TDTestCase: tdSql.checkData(59, 1, 60) tdSql.checkData(60, 1, 60) # + tdLog.printNoPrefix("==========step11:test multi-interp cases") tdSql.query(f"select interp(c0),interp(c1),interp(c2),interp(c3) from {dbname}.{tbname} range('2020-02-09 00:00:05', '2020-02-13 00:00:05') every(1d) fill(null)") tdSql.checkRows(5) @@ -2002,7 +2004,362 @@ class TDTestCase: for i in range (tdSql.queryCols): tdSql.checkData(0, i, 13) - tdLog.printNoPrefix("==========step12:test error cases") + tdLog.printNoPrefix("==========step12:test interp with boolean type") + tdSql.execute( + f'''create table if not exists {dbname}.{tbname2} + (ts timestamp, c0 bool) + ''' + ) + + + tdSql.execute(f"insert into {dbname}.{tbname2} values ('2020-02-02 00:00:01', false)") + tdSql.execute(f"insert into {dbname}.{tbname2} values ('2020-02-02 00:00:03', true)") + tdSql.execute(f"insert into {dbname}.{tbname2} values ('2020-02-02 00:00:05', false)") + tdSql.execute(f"insert into {dbname}.{tbname2} values ('2020-02-02 00:00:07', true)") + tdSql.execute(f"insert into {dbname}.{tbname2} values ('2020-02-02 00:00:09', true)") + tdSql.execute(f"insert into {dbname}.{tbname2} values ('2020-02-02 00:00:11', false)") + tdSql.execute(f"insert into {dbname}.{tbname2} values ('2020-02-02 00:00:13', false)") + tdSql.execute(f"insert into {dbname}.{tbname2} values ('2020-02-02 00:00:15', NULL)") + tdSql.execute(f"insert into {dbname}.{tbname2} values ('2020-02-02 00:00:17', NULL)") + + # test fill null + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(NULL)") + tdSql.checkRows(19) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, None) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, None) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, None) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, None) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, None) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, None) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, None) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, None) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, None) + tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, None) + + tdSql.checkData(18, 0, '2020-02-02 00:00:18.000') + + # test fill prev + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(prev)") + tdSql.checkRows(18) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:01.000') + + tdSql.checkData(0, 2, False) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, True) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, False) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, True) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, True) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, False) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, False) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, None) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, None) + tdSql.checkData(17, 2, None) + + tdSql.checkData(17, 0, '2020-02-02 00:00:18.000') + + # test fill next + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(next)") + tdSql.checkRows(18) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, False) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, True) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, False) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, True) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, True) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, False) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, False) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, None) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, None) + tdSql.checkData(17, 2, None) + + tdSql.checkData(17, 0, '2020-02-02 00:00:17.000') + + # test fill value + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(value, 0)") + tdSql.checkRows(19) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, False) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, False) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, False) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, False) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, False) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, False) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, False) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, False) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, False) + tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, False) + + tdSql.checkData(18, 0, '2020-02-02 00:00:18.000') + + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(value, 1234)") + tdSql.checkRows(19) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, True) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, True) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, True) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, True) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, True) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, True) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, True) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, True) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, True) + tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, True) + + tdSql.checkData(18, 0, '2020-02-02 00:00:18.000') + + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(value, false)") + tdSql.checkRows(19) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, False) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, False) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, False) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, False) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, False) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, False) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, False) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, False) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, False) + tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, False) + + tdSql.checkData(18, 0, '2020-02-02 00:00:18.000') + + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(value, true)") + tdSql.checkRows(19) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, True) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, True) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, True) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, True) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, True) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, True) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, True) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, True) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, True) + tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, True) + + tdSql.checkData(18, 0, '2020-02-02 00:00:18.000') + + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(value, '0')") + tdSql.checkRows(19) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, False) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, False) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, False) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, False) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, False) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, False) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, False) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, False) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, False) + tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, False) + + tdSql.checkData(18, 0, '2020-02-02 00:00:18.000') + + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(value, '123')") + tdSql.checkRows(19) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, True) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, True) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, True) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, True) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, True) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, True) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, True) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, True) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, True) + tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, True) + + tdSql.checkData(18, 0, '2020-02-02 00:00:18.000') + + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(value, 'abc')") + tdSql.checkRows(19) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, False) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, False) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, False) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, False) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, False) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, False) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, False) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, False) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, False) + tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, False) + + tdSql.checkData(18, 0, '2020-02-02 00:00:18.000') + + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(value, NULL)") + tdSql.checkRows(19) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:00.000') + + tdSql.checkData(0, 2, False) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, False) + tdSql.checkData(3, 2, True) + tdSql.checkData(4, 2, False) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, False) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, False) + tdSql.checkData(9, 2, True) + tdSql.checkData(10, 2, False) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, False) + tdSql.checkData(13, 2, False) + tdSql.checkData(14, 2, False) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, False) + tdSql.checkData(17, 2, None) + tdSql.checkData(18, 2, False) + + tdSql.checkData(18, 0, '2020-02-02 00:00:18.000') + + # test fill linear + tdSql.query(f"select _irowts,_isfilled,interp(c0) from {dbname}.{tbname2} range('2020-02-02 00:00:00', '2020-02-02 00:00:18') every(1s) fill(linear)") + tdSql.checkRows(17) + tdSql.checkCols(3) + + tdSql.checkData(0, 0, '2020-02-02 00:00:01.000') + + tdSql.checkData(0, 2, False) + tdSql.checkData(1, 2, False) + tdSql.checkData(2, 2, True) + tdSql.checkData(3, 2, False) + tdSql.checkData(4, 2, False) + tdSql.checkData(5, 2, False) + tdSql.checkData(6, 2, True) + tdSql.checkData(7, 2, True) + tdSql.checkData(8, 2, True) + tdSql.checkData(9, 2, False) + tdSql.checkData(10, 2, False) + tdSql.checkData(11, 2, False) + tdSql.checkData(12, 2, False) + tdSql.checkData(13, 2, None) + tdSql.checkData(14, 2, None) + tdSql.checkData(15, 2, None) + tdSql.checkData(16, 2, None) + + tdSql.checkData(16, 0, '2020-02-02 00:00:17.000') + + tdLog.printNoPrefix("==========step13:test error cases") tdSql.error(f"select interp(c0) from {dbname}.{tbname}") tdSql.error(f"select interp(c0) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05')") @@ -2013,7 +2370,7 @@ class TDTestCase: # input can only be numerical types tdSql.error(f"select interp(ts) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)") - tdSql.error(f"select interp(c6) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)") + #tdSql.error(f"select interp(c6) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)") tdSql.error(f"select interp(c7) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)") tdSql.error(f"select interp(c8) from {dbname}.{tbname} range('2020-02-10 00:00:05', '2020-02-15 00:00:05') every(1d) fill(null)") From ea6c96df32521476f5c046f502f1df628175d0d5 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 11 Apr 2023 11:13:23 +0800 Subject: [PATCH 24/29] fix memory leak --- source/libs/executor/inc/tfill.h | 1 + source/libs/executor/src/tfill.c | 1 + source/libs/executor/src/timesliceoperator.c | 3 +++ 3 files changed, 5 insertions(+) diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 726f0df1e8..5fd75f9b99 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -30,6 +30,7 @@ extern "C" { struct SSDataBlock; typedef struct SFillColInfo { + int32_t numOfFillExpr; SExprInfo* pExpr; bool notFillCol; // denote if this column needs fill operation SVariant fillVal; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 520ec3a72a..e59ea253cc 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -635,6 +635,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn nodesValueNodeToVariant(pv, &pFillCol[i].fillVal); } } + pFillCol->numOfFillExpr = numOfFillExpr; for (int32_t i = 0; i < numOfNoFillExpr; ++i) { SExprInfo* pExprInfo = &pNotFillExpr[i]; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 02b95f6b5c..f0e25d8cc5 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -661,6 +661,9 @@ void destroyTimeSliceOperatorInfo(void* param) { taosArrayDestroy(pInfo->pLinearInfo); cleanupExprSupp(&pInfo->scalarSup); + for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) { + taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal); + } taosMemoryFree(pInfo->pFillColInfo); taosMemoryFreeClear(param); } From 6b68a373e9949b68d6ba9ae5d09a618b95cd8994 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 11 Apr 2023 11:37:41 +0800 Subject: [PATCH 25/29] fix: when update func, set createdtime to original time --- source/dnode/mnode/impl/src/mndFunc.c | 38 ++++++++++++++------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index 5042e6543f..51a231daf4 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -145,8 +145,8 @@ static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) { goto _OVER; } SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER) - - if(sver >= 2){ + + if (sver >= 2) { SDB_GET_INT32(pRaw, dataPos, &pFunc->funcVersion, _OVER) } @@ -181,7 +181,7 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) { static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) { mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); - + taosWLockLatch(&pOld->lock); pOld->align = pNew->align; @@ -194,21 +194,21 @@ static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) { pOld->outputLen = pNew->outputLen; pOld->outputType = pNew->outputType; - if(pOld->pComment != NULL){ + if (pOld->pComment != NULL) { taosMemoryFree(pOld->pComment); pOld->pComment = NULL; } - if(pNew->commentSize > 0 && pNew->pComment != NULL){ + if (pNew->commentSize > 0 && pNew->pComment != NULL) { pOld->commentSize = pNew->commentSize; pOld->pComment = taosMemoryMalloc(pOld->commentSize); memcpy(pOld->pComment, pNew->pComment, pOld->commentSize); } - if(pOld->pCode != NULL){ + if (pOld->pCode != NULL) { taosMemoryFree(pOld->pCode); pOld->pCode = NULL; } - if(pNew->codeSize > 0 && pNew->pCode != NULL){ + if (pNew->codeSize > 0 && pNew->pCode != NULL) { pOld->codeSize = pNew->codeSize; pOld->pCode = taosMemoryMalloc(pOld->codeSize); memcpy(pOld->pCode, pNew->pCode, pOld->codeSize); @@ -216,7 +216,7 @@ static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) { pOld->scriptType = pNew->scriptType; pOld->signature = pNew->signature; - + taosWUnLockLatch(&pOld->lock); return 0; @@ -268,14 +268,15 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre memcpy(func.pComment, pCreate->pComment, func.commentSize); } memcpy(func.pCode, pCreate->pCode, func.codeSize); - + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func"); if (pTrans == NULL) goto _OVER; mInfo("trans:%d, used to create func:%s", pTrans->id, pCreate->name); SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name); - if(pCreate->orReplace == 1 && oldFunc != NULL){ + if (pCreate->orReplace == 1 && oldFunc != NULL) { func.funcVersion = oldFunc->funcVersion + 1; + func.createdTime = oldFunc->createdTime; SSdbRaw *pRedoRaw = mndFuncActionEncode(oldFunc); if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER; @@ -288,8 +289,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre SSdbRaw *pCommitRaw = mndFuncActionEncode(&func); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER; if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) goto _OVER; - } - else{ + } else { SSdbRaw *pRedoRaw = mndFuncActionEncode(&func); if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) goto _OVER; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) goto _OVER; @@ -308,7 +308,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre code = 0; _OVER: - if(oldFunc != NULL){ + if (oldFunc != NULL) { mndReleaseFunc(pMnode, oldFunc); } @@ -628,7 +628,7 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->bufSize, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - char* language = ""; + char *language = ""; if (pFunc->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { language = "C"; } else if (pFunc->scriptType == TSDB_FUNC_SCRIPT_PYTHON) { @@ -640,15 +640,17 @@ static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl colDataSetVal(pColInfo, numOfRows, (const char *)varLang, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - int32_t varCodeLen = (pFunc->codeSize + VARSTR_HEADER_SIZE) > TSDB_MAX_BINARY_LEN ? TSDB_MAX_BINARY_LEN : pFunc->codeSize + VARSTR_HEADER_SIZE; - char *b4 = taosMemoryMalloc(varCodeLen); + int32_t varCodeLen = (pFunc->codeSize + VARSTR_HEADER_SIZE) > TSDB_MAX_BINARY_LEN + ? TSDB_MAX_BINARY_LEN + : pFunc->codeSize + VARSTR_HEADER_SIZE; + char *b4 = taosMemoryMalloc(varCodeLen); memcpy(varDataVal(b4), pFunc->pCode, varCodeLen - VARSTR_HEADER_SIZE); varDataSetLen(b4, varCodeLen - VARSTR_HEADER_SIZE); - colDataSetVal(pColInfo, numOfRows, (const char*)b4, false); + colDataSetVal(pColInfo, numOfRows, (const char *)b4, false); taosMemoryFree(b4); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char*)&pFunc->funcVersion, false); + colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->funcVersion, false); numOfRows++; sdbRelease(pSdb, pFunc); From 74f2ba76a404ea9e53ea54a0fea3b40778c90669 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 11 Apr 2023 13:24:20 +0800 Subject: [PATCH 26/29] fix: check uv_is_closing before uv_close --- source/libs/function/src/tudf.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index fad118297e..2ade78db40 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1400,8 +1400,9 @@ void udfcUvHandleError(SClientUvConn *conn) { QUEUE_REMOVE(&task->procTaskQueue); uv_sem_post(&task->taskSem); } - - uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose); + if (!uv_is_closing(conn->pipe)) { + uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose); + } } void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { @@ -1553,7 +1554,9 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { } else { SClientUvConn *conn = pipe->data; QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); - uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose); + if (!uv_is_closing(uvTask->pipe)) { + uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose); + } code = 0; } break; From 41f7d79f2c8c43777f21efd500ca914d92d27754 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 11 Apr 2023 13:42:17 +0800 Subject: [PATCH 27/29] fix: fix compilation error --- source/libs/function/src/tudf.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 2ade78db40..2269ad7f6a 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1400,7 +1400,7 @@ void udfcUvHandleError(SClientUvConn *conn) { QUEUE_REMOVE(&task->procTaskQueue); uv_sem_post(&task->taskSem); } - if (!uv_is_closing(conn->pipe)) { + if (!uv_is_closing((uv_handle_t *)conn->pipe)) { uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose); } } @@ -1554,7 +1554,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { } else { SClientUvConn *conn = pipe->data; QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); - if (!uv_is_closing(uvTask->pipe)) { + if (!uv_is_closing((uv_handle_t *)uvTask->pipe)) { uv_close((uv_handle_t *)uvTask->pipe, onUdfcPipeClose); } code = 0; From 6f35badc785734f41a02c44f433d9f889b63fde0 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 11 Apr 2023 15:17:50 +0800 Subject: [PATCH 28/29] enhance: add udf replace function test case --- tests/script/tsim/query/udfpy.sim | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/tests/script/tsim/query/udfpy.sim b/tests/script/tsim/query/udfpy.sim index 025df7984b..9e0492ffd9 100644 --- a/tests/script/tsim/query/udfpy.sim +++ b/tests/script/tsim/query/udfpy.sim @@ -280,7 +280,37 @@ if $data20 != 8.000000000 then return -1 endi +sql create or replace function bit_and as '/tmp/udf/libbitand.so' outputtype int +sql select func_version from information_schema.ins_functions where name='bit_and' +if $data00 != 1 then + return -1 +endi +sql select bit_and(f1, f2) from t2; +print $rows , $data00 , $data10 , $data20 , $data30 , $data40 , $data50 +if $rows != 6 then + return -1 +endi +if $data00 != 0 then + return -1 +endi +if $data10 != 1 then + return -1 +endi +if $data20 != NULL then + return -1 +endi + +if $data30 != NULL then + return -1 +endi + +if $data40 != 0 then + return -1 +endi +if $data50 != 1 then + return -1 +endi #sql drop function bit_and; #sql show functions; #if $rows != 1 then From 8a6710ff284f5952e74f5ed1ef0162f070de2d1f Mon Sep 17 00:00:00 2001 From: Bo Ding Date: Tue, 11 Apr 2023 17:35:49 +0800 Subject: [PATCH 29/29] docs:compile error (#20865) --- docs/zh/12-taos-sql/29-changes.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/29-changes.md b/docs/zh/12-taos-sql/29-changes.md index af45d84ff2..73fa15313b 100644 --- a/docs/zh/12-taos-sql/29-changes.md +++ b/docs/zh/12-taos-sql/29-changes.md @@ -27,7 +27,7 @@ description: "TDengine 3.0 版本的语法变更说明" | - | :------- | :-------- | :------- | | 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。 | 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。 -| 3 | ALTER DATABASE | 调整 | 废除
  • QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。
  • BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。
  • UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。
  • CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。
  • COMP:3.0版本暂不支持修改。
    新增
  • CACHEMODEL:表示是否在内存中缓存子表的最近数据。
  • CACHESIZE:表示缓存子表最近数据的内存大小。
  • WAL_FSYNC_PERIOD:代替原FSYNC参数。
  • WAL_LEVEL:代替原WAL参数。
  • WAL_RETENTION_PERIOD:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。
  • WAL_RETENTION_SIZE:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。
    调整
  • REPLICA:3.0.0版本暂不支持修改。
  • KEEP:3.0版本新增支持带单位的设置方式。
+| 3 | ALTER DATABASE | 调整 | 废除
  • QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。
  • BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。
  • UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。
  • CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。
  • COMP:3.0版本暂不支持修改。

  • 新增
  • CACHEMODEL:表示是否在内存中缓存子表的最近数据。
  • CACHESIZE:表示缓存子表最近数据的内存大小。
  • WAL_FSYNC_PERIOD:代替原FSYNC参数。
  • WAL_LEVEL:代替原WAL参数。
  • WAL_RETENTION_PERIOD:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。
  • WAL_RETENTION_SIZE:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。
    调整
  • REPLICA:3.0.0版本暂不支持修改。
  • KEEP:3.0版本新增支持带单位的设置方式。
| 4 | ALTER STABLE | 调整 | 废除
  • CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。
    新增
  • RENAME TAG:代替原CHANGE TAG子句。
  • COMMENT:修改超级表的注释。
| 5 | ALTER TABLE | 调整 | 废除
  • CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。
    新增
  • RENAME TAG:代替原CHANGE TAG子句。
  • COMMENT:修改表的注释。
  • TTL:修改表的生命周期。
| 6 | ALTER USER | 调整 | 废除
  • PRIVILEGE:修改用户权限。3.0版本使用GRANT和REVOKE来授予和回收权限。
    新增
  • ENABLE:启用或停用此用户。
  • SYSINFO:修改用户是否可查看系统信息。