diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 52cb847b6f..2c7e6216f5 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -109,8 +109,9 @@ int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInter int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); // input: interbuf1, interbuf2 // output: resultBuf -int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, - SUdfInterBuf *resultBuf); +// udf todo: aggmerge +// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, +// SUdfInterBuf *resultBuf); // input: block // output: resultData int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 4efa8764e5..b057194cdb 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -668,8 +668,8 @@ int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) { len += tEncodeDataBlock(buf, &call->block); len += encodeUdfInterBuf(buf, &call->interBuf); } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) { - len += encodeUdfInterBuf(buf, &call->interBuf); - len += encodeUdfInterBuf(buf, &call->interBuf2); + // len += encodeUdfInterBuf(buf, &call->interBuf); + // len += encodeUdfInterBuf(buf, &call->interBuf2); } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) { len += encodeUdfInterBuf(buf, &call->interBuf); } @@ -690,10 +690,10 @@ void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) { buf = tDecodeDataBlock(buf, &call->block); buf = decodeUdfInterBuf(buf, &call->interBuf); break; - case TSDB_UDF_CALL_AGG_MERGE: - buf = decodeUdfInterBuf(buf, &call->interBuf); - buf = decodeUdfInterBuf(buf, &call->interBuf2); - break; + // case TSDB_UDF_CALL_AGG_MERGE: + // buf = decodeUdfInterBuf(buf, &call->interBuf); + // buf = decodeUdfInterBuf(buf, &call->interBuf2); + // break; case TSDB_UDF_CALL_AGG_FIN: buf = decodeUdfInterBuf(buf, &call->interBuf); break; @@ -779,9 +779,9 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) { case TSDB_UDF_CALL_AGG_PROC: len += encodeUdfInterBuf(buf, &callRsp->resultBuf); break; - case TSDB_UDF_CALL_AGG_MERGE: - len += encodeUdfInterBuf(buf, &callRsp->resultBuf); - break; + // case TSDB_UDF_CALL_AGG_MERGE: + // len += encodeUdfInterBuf(buf, &callRsp->resultBuf); + // break; case TSDB_UDF_CALL_AGG_FIN: len += encodeUdfInterBuf(buf, &callRsp->resultBuf); break; @@ -801,9 +801,9 @@ void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) { case TSDB_UDF_CALL_AGG_PROC: buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; - case TSDB_UDF_CALL_AGG_MERGE: - buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); - break; + // case TSDB_UDF_CALL_AGG_MERGE: + // buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); + // break; case TSDB_UDF_CALL_AGG_FIN: buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; @@ -1129,8 +1129,9 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf SSDataBlock *output, SUdfInterBuf *newState); int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); -int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, - SUdfInterBuf *resultBuf); +// udf todo: aggmerge +// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, +// SUdfInterBuf *resultBuf); int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); @@ -2176,11 +2177,11 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf req->interBuf = *state; break; } - case TSDB_UDF_CALL_AGG_MERGE: { - req->interBuf = *state; - req->interBuf2 = *state2; - break; - } + // case TSDB_UDF_CALL_AGG_MERGE: { + // req->interBuf = *state; + // req->interBuf2 = *state2; + // break; + // } case TSDB_UDF_CALL_AGG_FIN: { req->interBuf = *state; break; @@ -2205,10 +2206,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf *newState = rsp->resultBuf; break; } - case TSDB_UDF_CALL_AGG_MERGE: { - *newState = rsp->resultBuf; - break; - } + // case TSDB_UDF_CALL_AGG_MERGE: { + // *newState = rsp->resultBuf; + // break; + // } case TSDB_UDF_CALL_AGG_FIN: { *newState = rsp->resultBuf; break; @@ -2241,12 +2242,13 @@ int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInter // input: interbuf1, interbuf2 // output: resultBuf -int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, - SUdfInterBuf *resultBuf) { - int8_t callType = TSDB_UDF_CALL_AGG_MERGE; - int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); - return err; -} +// udf todo: aggmerge +// int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, +// SUdfInterBuf *resultBuf) { +// int8_t callType = TSDB_UDF_CALL_AGG_MERGE; +// int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); +// return err; +// } // input: interBuf // output: resultData diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index bbfd43d5f7..ecb24fc77a 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -194,17 +194,17 @@ int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdf } } -int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, - void *udfCtx) { - TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx); - SUdfCPluginCtx *ctx = udfCtx; - if (ctx->aggMergeFunc) { - return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); - } else { - fnError("udfd c plugin aggregation merge not implemented"); - return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; - } -} +// int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, +// void *udfCtx) { +// TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx); +// SUdfCPluginCtx *ctx = udfCtx; +// if (ctx->aggMergeFunc) { +// return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); +// } else { +// fnError("udfd c plugin aggregation merge not implemented"); +// return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; +// } +// } int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) { TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx); @@ -378,7 +378,7 @@ int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc; plugin->udfAggStartFunc = udfdCPluginUdfAggStart; plugin->udfAggProcFunc = udfdCPluginUdfAggProc; - plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; + // plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}}; @@ -889,19 +889,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { break; } - case TSDB_UDF_CALL_AGG_MERGE: { - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - if (outBuf.buf != NULL) { - code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); - freeUdfInterBuf(&call->interBuf); - freeUdfInterBuf(&call->interBuf2); - subRsp->resultBuf = outBuf; - } else { - code = terrno; - } - - break; - } + // case TSDB_UDF_CALL_AGG_MERGE: { + // SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; + // if (outBuf.buf != NULL) { + // code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); + // freeUdfInterBuf(&call->interBuf); + // freeUdfInterBuf(&call->interBuf2); + // subRsp->resultBuf = outBuf; + // } else { + // code = terrno; + // } + // + // break; + // } case TSDB_UDF_CALL_AGG_FIN: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; if (outBuf.buf != NULL) { @@ -959,10 +959,10 @@ _exit: freeUdfInterBuf(&subRsp->resultBuf); break; } - case TSDB_UDF_CALL_AGG_MERGE: { - freeUdfInterBuf(&subRsp->resultBuf); - break; - } + // case TSDB_UDF_CALL_AGG_MERGE: { + // freeUdfInterBuf(&subRsp->resultBuf); + // break; + // } case TSDB_UDF_CALL_AGG_FIN: { freeUdfInterBuf(&subRsp->resultBuf); break; @@ -1667,7 +1667,6 @@ static int32_t udfdGlobalDataInit() { } static void udfdGlobalDataDeinit() { - taosHashCleanup(global.udfsHash); uv_mutex_destroy(&global.udfsMutex); uv_mutex_destroy(&global.scriptPluginsMutex); taosMemoryFreeClear(global.loop); @@ -1720,8 +1719,11 @@ void udfdDeinitResidentFuncs() { SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); if (udfInHash) { SUdf *udf = *udfInHash; - int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); - fnDebug("udfd destroy function returns %d", code); + int32_t code = 0; + if (udf->scriptPlugin->udfDestroyFunc) { + code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); + fnDebug("udfd %s destroy function returns %d", funcName, code); + } if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0) { fnError("udfd remove resident function %s failed", funcName); @@ -1729,6 +1731,7 @@ void udfdDeinitResidentFuncs() { taosMemoryFree(udf); } } + taosHashCleanup(global.udfsHash); taosArrayDestroy(global.residentFuncs); fnInfo("udfd resident functions are deinit"); } @@ -1838,15 +1841,15 @@ int main(int argc, char *argv[]) { fnInfo("udfd exit normally"); removeListeningPipe(); - udfdDeinitScriptPlugins(); _exit: - if (globalDataInited) { - udfdGlobalDataDeinit(); - } if (residentFuncsInited) { udfdDeinitResidentFuncs(); } + udfdDeinitScriptPlugins(); + if (globalDataInited) { + udfdGlobalDataDeinit(); + } if (udfSourceDirInited) { udfdDestroyUdfSourceDir(); } diff --git a/tests/system-test/0-others/udfTest.py b/tests/system-test/0-others/udfTest.py index 829a8aec27..d3efa61e04 100644 --- a/tests/system-test/0-others/udfTest.py +++ b/tests/system-test/0-others/udfTest.py @@ -4,6 +4,8 @@ import sys import time import os import platform +import random +import string from util.log import * from util.sql import * @@ -12,7 +14,7 @@ from util.dnodes import * import subprocess class TDTestCase: - + updatecfgDict = {'udfdResFuncs': "udf1,udf2"} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") @@ -652,10 +654,20 @@ class TDTestCase: tdDnodes.start(1) time.sleep(2) + def test_udfd_cmd(self): + tdLog.info(" test udfd -V ") + os.system("udfd -V") + tdLog.info(" test udfd -c ") + os.system("udfd -c") + + letters = string.ascii_letters + string.digits + '\\' + path = ''.join(random.choice(letters) for i in range(5000)) - def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + os.system(f"udfd -c {path}") + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring print(" env is ok for all ") + self.test_udfd_cmd() self.prepare_udf_so() self.prepare_data() self.create_udf_function() diff --git a/tests/system-test/0-others/udf_restart_taosd.py b/tests/system-test/0-others/udf_restart_taosd.py index c99e864e71..f9a3f08bf5 100644 --- a/tests/system-test/0-others/udf_restart_taosd.py +++ b/tests/system-test/0-others/udf_restart_taosd.py @@ -11,7 +11,7 @@ from util.dnodes import * import subprocess class TDTestCase: - + updatecfgDict = {'udfdResFuncs': "udf1,udf2"} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}")