Merge pull request #29226 from taosdata/enh/TD-33253/coverage

udf: code coverage
This commit is contained in:
Shengliang Guan 2024-12-20 14:22:32 +08:00 committed by GitHub
commit 9a64317881
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 88 additions and 70 deletions

View File

@ -109,8 +109,9 @@ int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInter
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
// input: interbuf1, interbuf2 // input: interbuf1, interbuf2
// output: resultBuf // output: resultBuf
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, // udf todo: aggmerge
SUdfInterBuf *resultBuf); // int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
// SUdfInterBuf *resultBuf);
// input: block // input: block
// output: resultData // output: resultData
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);

View File

@ -668,8 +668,8 @@ int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) {
len += tEncodeDataBlock(buf, &call->block); len += tEncodeDataBlock(buf, &call->block);
len += encodeUdfInterBuf(buf, &call->interBuf); len += encodeUdfInterBuf(buf, &call->interBuf);
} else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) { } else if (call->callType == TSDB_UDF_CALL_AGG_MERGE) {
len += encodeUdfInterBuf(buf, &call->interBuf); // len += encodeUdfInterBuf(buf, &call->interBuf);
len += encodeUdfInterBuf(buf, &call->interBuf2); // len += encodeUdfInterBuf(buf, &call->interBuf2);
} else if (call->callType == TSDB_UDF_CALL_AGG_FIN) { } else if (call->callType == TSDB_UDF_CALL_AGG_FIN) {
len += encodeUdfInterBuf(buf, &call->interBuf); len += encodeUdfInterBuf(buf, &call->interBuf);
} }
@ -690,10 +690,10 @@ void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) {
buf = tDecodeDataBlock(buf, &call->block); buf = tDecodeDataBlock(buf, &call->block);
buf = decodeUdfInterBuf(buf, &call->interBuf); buf = decodeUdfInterBuf(buf, &call->interBuf);
break; break;
case TSDB_UDF_CALL_AGG_MERGE: // case TSDB_UDF_CALL_AGG_MERGE:
buf = decodeUdfInterBuf(buf, &call->interBuf); // buf = decodeUdfInterBuf(buf, &call->interBuf);
buf = decodeUdfInterBuf(buf, &call->interBuf2); // buf = decodeUdfInterBuf(buf, &call->interBuf2);
break; // break;
case TSDB_UDF_CALL_AGG_FIN: case TSDB_UDF_CALL_AGG_FIN:
buf = decodeUdfInterBuf(buf, &call->interBuf); buf = decodeUdfInterBuf(buf, &call->interBuf);
break; break;
@ -779,9 +779,9 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
case TSDB_UDF_CALL_AGG_PROC: case TSDB_UDF_CALL_AGG_PROC:
len += encodeUdfInterBuf(buf, &callRsp->resultBuf); len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break; break;
case TSDB_UDF_CALL_AGG_MERGE: // case TSDB_UDF_CALL_AGG_MERGE:
len += encodeUdfInterBuf(buf, &callRsp->resultBuf); // len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break; // break;
case TSDB_UDF_CALL_AGG_FIN: case TSDB_UDF_CALL_AGG_FIN:
len += encodeUdfInterBuf(buf, &callRsp->resultBuf); len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break; break;
@ -801,9 +801,9 @@ void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) {
case TSDB_UDF_CALL_AGG_PROC: case TSDB_UDF_CALL_AGG_PROC:
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break; break;
case TSDB_UDF_CALL_AGG_MERGE: // case TSDB_UDF_CALL_AGG_MERGE:
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); // buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break; // break;
case TSDB_UDF_CALL_AGG_FIN: case TSDB_UDF_CALL_AGG_FIN:
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break; break;
@ -1129,8 +1129,9 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
SSDataBlock *output, SUdfInterBuf *newState); SSDataBlock *output, SUdfInterBuf *newState);
int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, // udf todo: aggmerge
SUdfInterBuf *resultBuf); // int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
// SUdfInterBuf *resultBuf);
int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
@ -2176,11 +2177,11 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
req->interBuf = *state; req->interBuf = *state;
break; break;
} }
case TSDB_UDF_CALL_AGG_MERGE: { // case TSDB_UDF_CALL_AGG_MERGE: {
req->interBuf = *state; // req->interBuf = *state;
req->interBuf2 = *state2; // req->interBuf2 = *state2;
break; // break;
} // }
case TSDB_UDF_CALL_AGG_FIN: { case TSDB_UDF_CALL_AGG_FIN: {
req->interBuf = *state; req->interBuf = *state;
break; break;
@ -2205,10 +2206,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
*newState = rsp->resultBuf; *newState = rsp->resultBuf;
break; break;
} }
case TSDB_UDF_CALL_AGG_MERGE: { // case TSDB_UDF_CALL_AGG_MERGE: {
*newState = rsp->resultBuf; // *newState = rsp->resultBuf;
break; // break;
} // }
case TSDB_UDF_CALL_AGG_FIN: { case TSDB_UDF_CALL_AGG_FIN: {
*newState = rsp->resultBuf; *newState = rsp->resultBuf;
break; break;
@ -2241,12 +2242,13 @@ int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInter
// input: interbuf1, interbuf2 // input: interbuf1, interbuf2
// output: resultBuf // output: resultBuf
int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, // udf todo: aggmerge
SUdfInterBuf *resultBuf) { // int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2,
int8_t callType = TSDB_UDF_CALL_AGG_MERGE; // SUdfInterBuf *resultBuf) {
int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); // int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
return err; // int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
} // return err;
// }
// input: interBuf // input: interBuf
// output: resultData // output: resultData

View File

@ -194,17 +194,17 @@ int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdf
} }
} }
int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, // int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
void *udfCtx) { // void *udfCtx) {
TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx); // TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx);
SUdfCPluginCtx *ctx = udfCtx; // SUdfCPluginCtx *ctx = udfCtx;
if (ctx->aggMergeFunc) { // if (ctx->aggMergeFunc) {
return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); // return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
} else { // } else {
fnError("udfd c plugin aggregation merge not implemented"); // fnError("udfd c plugin aggregation merge not implemented");
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; // return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
} // }
} // }
int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) { int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) {
TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx); TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx);
@ -378,7 +378,7 @@ int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc; plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc;
plugin->udfAggStartFunc = udfdCPluginUdfAggStart; plugin->udfAggStartFunc = udfdCPluginUdfAggStart;
plugin->udfAggProcFunc = udfdCPluginUdfAggProc; plugin->udfAggProcFunc = udfdCPluginUdfAggProc;
plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge; // plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge;
plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish;
SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}}; SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}};
@ -889,19 +889,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
break; break;
} }
case TSDB_UDF_CALL_AGG_MERGE: { // case TSDB_UDF_CALL_AGG_MERGE: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; // SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
if (outBuf.buf != NULL) { // if (outBuf.buf != NULL) {
code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx); // code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
freeUdfInterBuf(&call->interBuf); // freeUdfInterBuf(&call->interBuf);
freeUdfInterBuf(&call->interBuf2); // freeUdfInterBuf(&call->interBuf2);
subRsp->resultBuf = outBuf; // subRsp->resultBuf = outBuf;
} else { // } else {
code = terrno; // code = terrno;
} // }
//
break; // break;
} // }
case TSDB_UDF_CALL_AGG_FIN: { case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
if (outBuf.buf != NULL) { if (outBuf.buf != NULL) {
@ -959,10 +959,10 @@ _exit:
freeUdfInterBuf(&subRsp->resultBuf); freeUdfInterBuf(&subRsp->resultBuf);
break; break;
} }
case TSDB_UDF_CALL_AGG_MERGE: { // case TSDB_UDF_CALL_AGG_MERGE: {
freeUdfInterBuf(&subRsp->resultBuf); // freeUdfInterBuf(&subRsp->resultBuf);
break; // break;
} // }
case TSDB_UDF_CALL_AGG_FIN: { case TSDB_UDF_CALL_AGG_FIN: {
freeUdfInterBuf(&subRsp->resultBuf); freeUdfInterBuf(&subRsp->resultBuf);
break; break;
@ -1667,7 +1667,6 @@ static int32_t udfdGlobalDataInit() {
} }
static void udfdGlobalDataDeinit() { static void udfdGlobalDataDeinit() {
taosHashCleanup(global.udfsHash);
uv_mutex_destroy(&global.udfsMutex); uv_mutex_destroy(&global.udfsMutex);
uv_mutex_destroy(&global.scriptPluginsMutex); uv_mutex_destroy(&global.scriptPluginsMutex);
taosMemoryFreeClear(global.loop); taosMemoryFreeClear(global.loop);
@ -1720,8 +1719,11 @@ void udfdDeinitResidentFuncs() {
SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
if (udfInHash) { if (udfInHash) {
SUdf *udf = *udfInHash; SUdf *udf = *udfInHash;
int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); int32_t code = 0;
fnDebug("udfd destroy function returns %d", code); if (udf->scriptPlugin->udfDestroyFunc) {
code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
fnDebug("udfd %s destroy function returns %d", funcName, code);
}
if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0) if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0)
{ {
fnError("udfd remove resident function %s failed", funcName); fnError("udfd remove resident function %s failed", funcName);
@ -1729,6 +1731,7 @@ void udfdDeinitResidentFuncs() {
taosMemoryFree(udf); taosMemoryFree(udf);
} }
} }
taosHashCleanup(global.udfsHash);
taosArrayDestroy(global.residentFuncs); taosArrayDestroy(global.residentFuncs);
fnInfo("udfd resident functions are deinit"); fnInfo("udfd resident functions are deinit");
} }
@ -1838,15 +1841,15 @@ int main(int argc, char *argv[]) {
fnInfo("udfd exit normally"); fnInfo("udfd exit normally");
removeListeningPipe(); removeListeningPipe();
udfdDeinitScriptPlugins();
_exit: _exit:
if (globalDataInited) {
udfdGlobalDataDeinit();
}
if (residentFuncsInited) { if (residentFuncsInited) {
udfdDeinitResidentFuncs(); udfdDeinitResidentFuncs();
} }
udfdDeinitScriptPlugins();
if (globalDataInited) {
udfdGlobalDataDeinit();
}
if (udfSourceDirInited) { if (udfSourceDirInited) {
udfdDestroyUdfSourceDir(); udfdDestroyUdfSourceDir();
} }

View File

@ -4,6 +4,8 @@ import sys
import time import time
import os import os
import platform import platform
import random
import string
from util.log import * from util.log import *
from util.sql import * from util.sql import *
@ -12,7 +14,7 @@ from util.dnodes import *
import subprocess import subprocess
class TDTestCase: class TDTestCase:
updatecfgDict = {'udfdResFuncs': "udf1,udf2"}
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
@ -652,10 +654,20 @@ class TDTestCase:
tdDnodes.start(1) tdDnodes.start(1)
time.sleep(2) time.sleep(2)
def test_udfd_cmd(self):
tdLog.info(" test udfd -V ")
os.system("udfd -V")
tdLog.info(" test udfd -c ")
os.system("udfd -c")
letters = string.ascii_letters + string.digits + '\\'
path = ''.join(random.choice(letters) for i in range(5000))
os.system(f"udfd -c {path}")
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
print(" env is ok for all ") print(" env is ok for all ")
self.test_udfd_cmd()
self.prepare_udf_so() self.prepare_udf_so()
self.prepare_data() self.prepare_data()
self.create_udf_function() self.create_udf_function()

View File

@ -11,7 +11,7 @@ from util.dnodes import *
import subprocess import subprocess
class TDTestCase: class TDTestCase:
updatecfgDict = {'udfdResFuncs': "udf1,udf2"}
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")