Merge pull request #12165 from taosdata/feature/udf

fix: core dump when pass mulitiple columns from taosd to udfd
This commit is contained in:
shenglian-zhou 2022-05-06 18:05:53 +08:00 committed by GitHub
commit b405ee49e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 53 additions and 8 deletions

View File

@ -29,7 +29,11 @@ extern "C" {
#endif
#define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock."
#ifdef _WIN32
#define UDF_LISTEN_PIPE_NAME_PREFIX "\\\\?\\pipe\\udfd.sock"
#else
#define UDF_LISTEN_PIPE_NAME_PREFIX ".udfd.sock."
#endif
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
//======================================================================================
@ -129,8 +133,8 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
// begin API to UDF writer.
// dynamic lib init and destroy
typedef int32_t (*TUdfSetupFunc)();
typedef int32_t (*TUdfTeardownFunc)();
typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfDestroyFunc)();
//TODO: add API to check function arguments type, number etc.
@ -242,7 +246,6 @@ static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRo
return 0;
}
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);

View File

@ -594,7 +594,9 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
//TODO: free the array output->pDataBlock
output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
taosArrayPush(output->pDataBlock, input->columnData);
for (int32_t i = 0; i < numOfCols; ++i) {
taosArrayPush(output->pDataBlock, (input + i)->columnData);
}
return 0;
}

View File

@ -81,6 +81,9 @@ typedef struct SUdf {
TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc;
TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc;
} SUdf;
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
@ -101,7 +104,19 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
return UDFC_CODE_LOAD_UDF_FAILURE;
}
//TODO: init and destroy function
char initFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
char *initSuffix = "_init";
strcpy(initFuncName, udfName);
strncat(initFuncName, initSuffix, strlen(initSuffix));
uv_dlsym(&udf->lib, initFuncName, (void**)(&udf->initFunc));
char destroyFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
char *destroySuffix = "_destroy";
strcpy(destroyFuncName, udfName);
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
uv_dlsym(&udf->lib, destroyFuncName, (void**)(&udf->destroyFunc));
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
@ -159,6 +174,9 @@ void udfdProcessRequest(uv_work_t *req) {
if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING;
udfdLoadUdf(setup->udfName, udf);
if (udf->initFunc) {
udf->initFunc();
}
udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock);
@ -170,7 +188,6 @@ void udfdProcessRequest(uv_work_t *req) {
}
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf;
// TODO: allocate private structure and call init function and set it to handle
SUdfResponse rsp;
rsp.seqNum = request.seqNum;
rsp.type = request.type;
@ -275,10 +292,12 @@ void udfdProcessRequest(uv_work_t *req) {
if (unloadUdf) {
uv_cond_destroy(&udf->condReady);
uv_mutex_destroy(&udf->lock);
if (udf->destroyFunc) {
(udf->destroyFunc)();
}
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
}
// TODO: call destroy and free udf private
taosMemoryFree(handle);
SUdfResponse response;

View File

@ -43,6 +43,27 @@ if $data00 != 2.236067977 then
return -1
endi
sql create table t2 (ts timestamp, f1 int, f2 int);
sql insert into t2 values(now, 0, 0)(now+1s, 1, 1);
sql select udf1(f1, f2) from t2;
if $rows != 2 then
return -1
endi
if $data00 != 88 then
return -1
endi
if $data10 != 88 then
return -1
endi
sql select udf2(f1, f2) from t2;
if $rows != 1 then
return -1
endi
if $data00 != 1.414213562 then
return -1
endi
#sql drop function udf1;
#sql drop function udf2;
system sh/exec.sh -n dnode1 -s stop -x SIGKILL