fix: core dump when pass mulitiple columns from taosd to udfd

This commit is contained in:
slzhou 2022-05-06 17:41:49 +08:00
parent 886295b292
commit c9ee1b0dee
4 changed files with 53 additions and 8 deletions

View File

@ -29,7 +29,11 @@ extern "C" {
#endif #endif
#define UDF_LISTEN_PIPE_NAME_LEN 32 #define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock." #ifdef _WIN32
#define UDF_LISTEN_PIPE_NAME_PREFIX "\\\\?\\pipe\\udfd.sock"
#else
#define UDF_LISTEN_PIPE_NAME_PREFIX ".udfd.sock."
#endif
#define UDF_DNODE_ID_ENV_NAME "DNODE_ID" #define UDF_DNODE_ID_ENV_NAME "DNODE_ID"
//====================================================================================== //======================================================================================
@ -129,8 +133,8 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock);
// begin API to UDF writer. // begin API to UDF writer.
// dynamic lib init and destroy // dynamic lib init and destroy
typedef int32_t (*TUdfSetupFunc)(); typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfTeardownFunc)(); typedef int32_t (*TUdfDestroyFunc)();
//TODO: add API to check function arguments type, number etc. //TODO: add API to check function arguments type, number etc.
@ -242,7 +246,6 @@ static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRo
return 0; return 0;
} }
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf); typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);

View File

@ -594,7 +594,9 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS
//TODO: free the array output->pDataBlock //TODO: free the array output->pDataBlock
output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
taosArrayPush(output->pDataBlock, input->columnData); for (int32_t i = 0; i < numOfCols; ++i) {
taosArrayPush(output->pDataBlock, (input + i)->columnData);
}
return 0; return 0;
} }

View File

@ -81,6 +81,9 @@ typedef struct SUdf {
TUdfAggStartFunc aggStartFunc; TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc; TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc; TUdfAggFinishFunc aggFinishFunc;
TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc;
} SUdf; } SUdf;
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix // TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
@ -101,7 +104,19 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
return UDFC_CODE_LOAD_UDF_FAILURE; return UDFC_CODE_LOAD_UDF_FAILURE;
} }
//TODO: init and destroy function
char initFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
char *initSuffix = "_init";
strcpy(initFuncName, udfName);
strncat(initFuncName, initSuffix, strlen(initSuffix));
uv_dlsym(&udf->lib, initFuncName, (void**)(&udf->initFunc));
char destroyFuncName[TSDB_FUNC_NAME_LEN+5] = {0};
char *destroySuffix = "_destroy";
strcpy(destroyFuncName, udfName);
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
uv_dlsym(&udf->lib, destroyFuncName, (void**)(&udf->destroyFunc));
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) { if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName); strcpy(processFuncName, udfName);
@ -159,6 +174,9 @@ void udfdProcessRequest(uv_work_t *req) {
if (udf->state == UDF_STATE_INIT) { if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING; udf->state = UDF_STATE_LOADING;
udfdLoadUdf(setup->udfName, udf); udfdLoadUdf(setup->udfName, udf);
if (udf->initFunc) {
udf->initFunc();
}
udf->state = UDF_STATE_READY; udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady); uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock); uv_mutex_unlock(&udf->lock);
@ -170,7 +188,6 @@ void udfdProcessRequest(uv_work_t *req) {
} }
SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf; handle->udf = udf;
// TODO: allocate private structure and call init function and set it to handle
SUdfResponse rsp; SUdfResponse rsp;
rsp.seqNum = request.seqNum; rsp.seqNum = request.seqNum;
rsp.type = request.type; rsp.type = request.type;
@ -275,10 +292,12 @@ void udfdProcessRequest(uv_work_t *req) {
if (unloadUdf) { if (unloadUdf) {
uv_cond_destroy(&udf->condReady); uv_cond_destroy(&udf->condReady);
uv_mutex_destroy(&udf->lock); uv_mutex_destroy(&udf->lock);
if (udf->destroyFunc) {
(udf->destroyFunc)();
}
uv_dlclose(&udf->lib); uv_dlclose(&udf->lib);
taosMemoryFree(udf); taosMemoryFree(udf);
} }
// TODO: call destroy and free udf private
taosMemoryFree(handle); taosMemoryFree(handle);
SUdfResponse response; SUdfResponse response;

View File

@ -43,6 +43,27 @@ if $data00 != 2.236067977 then
return -1 return -1
endi endi
sql create table t2 (ts timestamp, f1 int, f2 int);
sql insert into t2 values(now, 0, 0)(now+1s, 1, 1);
sql select udf1(f1, f2) from t2;
if $rows != 2 then
return -1
endi
if $data00 != 88 then
return -1
endi
if $data10 != 88 then
return -1
endi
sql select udf2(f1, f2) from t2;
if $rows != 1 then
return -1
endi
if $data00 != 1.414213562 then
return -1
endi
#sql drop function udf1; #sql drop function udf1;
#sql drop function udf2; #sql drop function udf2;
system sh/exec.sh -n dnode1 -s stop -x SIGKILL system sh/exec.sh -n dnode1 -s stop -x SIGKILL