From 59f45345fb28e0a46557a8c4c38906a305e50b7b Mon Sep 17 00:00:00 2001 From: xinsheng Ren <285808407@qq.com> Date: Thu, 20 Mar 2025 13:45:33 +0800 Subject: [PATCH] fix: memleak while stop udfd * fix: use address sanitizer to detect buffer overflow in udfTest.py * fix: improve UDF process termination in udfTest.py * fix: udf test case, stack overflow --------- Co-authored-by: chenhaoran --- source/libs/function/src/tudf.c | 26 ++++++++++++++++++-------- tests/parallel_test/cases.task | 2 +- tests/system-test/0-others/udfTest.py | 9 +++++++-- 3 files changed, 26 insertions(+), 11 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index f03bec5a32..48a7e7c345 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -2103,6 +2103,16 @@ _exit: return code; } +static void freeTaskSession(SClientUdfTask *task) { + uv_mutex_lock(&gUdfcProxy.udfcUvMutex); + if (task->session->udfUvPipe != NULL && task->session->udfUvPipe->data != NULL) { + SClientUvConn *conn = task->session->udfUvPipe->data; + conn->session = NULL; + } + uv_mutex_unlock(&gUdfcProxy.udfcUvMutex); + taosMemoryFreeClear(task->session); +} + int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { int32_t code = TSDB_CODE_SUCCESS, lino = 0; SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); @@ -2143,7 +2153,7 @@ _exit: if (code != 0) { fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino); } - taosMemoryFree(task->session); + freeTaskSession(task); taosMemoryFree(task); return code; } @@ -2308,18 +2318,18 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); // TODO: synchronization refactor between libuv event loop and request thread - uv_mutex_lock(&gUdfcProxy.udfcUvMutex); - if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) { - SClientUvConn *conn = session->udfUvPipe->data; - conn->session = NULL; - } - uv_mutex_unlock(&gUdfcProxy.udfcUvMutex); + // uv_mutex_lock(&gUdfcProxy.udfcUvMutex); + // if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) { + // SClientUvConn *conn = session->udfUvPipe->data; + // conn->session = NULL; + // } + // uv_mutex_unlock(&gUdfcProxy.udfcUvMutex); _exit: if (code != 0) { fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino); } - taosMemoryFree(session); + freeTaskSession(task); taosMemoryFree(task); return code; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 8569a3cb6d..bda8e1aa43 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -562,7 +562,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/taosd_audit.py ,,n,system-test,python3 ./test.py -f 0-others/taosdlog.py ,,n,system-test,python3 ./test.py -f 0-others/taosdShell.py -N 5 -M 3 -Q 3 -,,n,system-test,python3 ./test.py -f 0-others/udfTest.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/udfTest.py ,,n,system-test,python3 ./test.py -f 0-others/udf_create.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/udf_restart_taosd.py ,,n,system-test,python3 ./test.py -f 0-others/udf_cfg1.py diff --git a/tests/system-test/0-others/udfTest.py b/tests/system-test/0-others/udfTest.py index 7d953f2977..dc5d5fde8d 100644 --- a/tests/system-test/0-others/udfTest.py +++ b/tests/system-test/0-others/udfTest.py @@ -631,11 +631,16 @@ class TDTestCase: tdSql.query("select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") tdSql.checkData(0,0,169.661427555) tdSql.checkData(0,1,169.661427555) - # stop taosudf cmds + + clean_env = os.environ.copy() + clean_env.pop('ASAN_OPTIONS', None) + clean_env.pop('LD_PRELOAD', None) get_processID = "ps -ef | grep -w taosudf | grep -v grep| grep -v defunct | awk '{print $2}'" - processID = subprocess.check_output(get_processID, shell=True).decode("utf-8") + processID = subprocess.check_output(get_processID, shell=True, env=clean_env).decode("utf-8") + tdLog.info("taosudf process ID: %s" % processID) stop_udfd = " kill -9 %s" % processID os.system(stop_udfd) + time.sleep(2)