Merge pull request #20265 from taosdata/szhou/python-udf

feature: python udf
This commit is contained in:
dapan1121 2023-03-14 09:11:37 +08:00 committed by GitHub
commit b27e39beab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 561 additions and 156 deletions

View File

@ -80,10 +80,6 @@ typedef struct SUdfInterBuf {
} SUdfInterBuf;
typedef void *UdfcFuncHandle;
// dynamic lib init and destroy
typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfDestroyFunc)();
#define UDF_MEMORY_EXP_GROWTH 1.5
#define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
@ -153,6 +149,8 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne
allocCapacity *= UDF_MEMORY_EXP_GROWTH;
}
int32_t existedRows = data->numOfRows;
if (IS_VAR_DATA_TYPE(meta->type)) {
char *tmp = (char *)realloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity);
if (tmp == NULL) {
@ -160,6 +158,7 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne
}
data->varLenCol.varOffsets = (int32_t *)tmp;
data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity;
memset(&data->varLenCol.varOffsets[existedRows], 0, sizeof(int32_t) * (allocCapacity - existedRows));
// for payload, add data in udfColDataAppend
} else {
char *tmp = (char *)realloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity));
@ -168,6 +167,9 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne
}
data->fixLenCol.nullBitmap = tmp;
data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity);
int32_t oldLen = BitmapLen(existedRows);
memset(&data->fixLenCol.nullBitmap[oldLen], 0, BitmapLen(allocCapacity) - oldLen);
if (meta->type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
}
@ -194,6 +196,7 @@ static FORCE_INLINE void udfColDataSetNull(SUdfColumn *pColumn, int32_t row) {
udfColDataSetNull_f(pColumn, row);
}
pColumn->hasNull = true;
pColumn->colData.numOfRows = ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows;
}
static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentRow, const char *pData, bool isNull) {
@ -252,6 +255,10 @@ static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentR
return 0;
}
// dynamic lib init and destroy for C UDF
typedef int32_t (*TUdfInitFunc)();
typedef int32_t (*TUdfDestroyFunc)();
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggStartFunc)(SUdfInterBuf *buf);
@ -259,6 +266,41 @@ typedef int32_t (*TUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interB
typedef int32_t (*TUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf);
typedef int32_t (*TUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
typedef struct SScriptUdfEnvItem {
const char *name;
const char *value;
} SScriptUdfEnvItem;
typedef enum EUdfFuncType { UDF_FUNC_TYPE_SCALAR = 1, UDF_FUNC_TYPE_AGG = 2 } EUdfFuncType;
typedef struct SScriptUdfInfo {
const char *name;
EUdfFuncType funcType;
int8_t scriptType;
int8_t outputType;
int32_t outputLen;
int32_t bufSize;
const char *path;
} SScriptUdfInfo;
typedef int32_t (*TScriptUdfScalarProcFunc)(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx);
typedef int32_t (*TScriptUdfAggStartFunc)(SUdfInterBuf *buf, void *udfCtx);
typedef int32_t (*TScriptUdfAggProcessFunc)(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf,
void *udfCtx);
typedef int32_t (*TScriptUdfAggMergeFunc)(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
void *udfCtx);
typedef int32_t (*TScriptUdfAggFinishFunc)(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx);
typedef int32_t (*TScriptUdfInitFunc)(SScriptUdfInfo *info, void **pUdfCtx);
typedef int32_t (*TScriptUdfDestoryFunc)(void *udfCtx);
// the following function is for open/close script plugin.
typedef int32_t (*TScriptOpenFunc)(SScriptUdfEnvItem *items, int numItems);
typedef int32_t (*TScriptCloseFunc)();
#ifdef __cplusplus
}
#endif

View File

@ -713,6 +713,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_UDF_NO_FUNC_HANDLE TAOS_DEF_ERROR_CODE(0, 0x2908)
#define TSDB_CODE_UDF_INVALID_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x2909)
#define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x290A)
#define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x290B)
#define TSDB_CODE_UDF_FUNC_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290C)
// sml
#define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000)

View File

@ -202,7 +202,7 @@ typedef enum ELogicConditionType {
#define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_COMMENT_LEN 1024 * 1024
#define TSDB_FUNC_CODE_LEN 10 * 1024 * 1024
#define TSDB_FUNC_BUF_SIZE 512
#define TSDB_FUNC_BUF_SIZE 4096 * 64
#define TSDB_FUNC_TYPE_SCALAR 1
#define TSDB_FUNC_TYPE_AGGREGATE 2
#define TSDB_FUNC_SCRIPT_BIN_LIB 0

View File

@ -41,7 +41,7 @@ typedef struct SUdfSetupRequest {
typedef struct SUdfSetupResponse {
int64_t udfHandle;
int8_t outputType;
int32_t outputLen;
int32_t bytes;
int32_t bufSize;
} SUdfSetupResponse;

View File

@ -376,7 +376,7 @@ typedef struct SUdfcUvSession {
uv_pipe_t *udfUvPipe;
int8_t outputType;
int32_t outputLen;
int32_t bytes;
int32_t bufSize;
char udfName[TSDB_FUNC_NAME_LEN + 1];
@ -614,7 +614,7 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
int32_t len = 0;
len += taosEncodeFixedI64(buf, setupRsp->udfHandle);
len += taosEncodeFixedI8(buf, setupRsp->outputType);
len += taosEncodeFixedI32(buf, setupRsp->outputLen);
len += taosEncodeFixedI32(buf, setupRsp->bytes);
len += taosEncodeFixedI32(buf, setupRsp->bufSize);
return len;
}
@ -622,7 +622,7 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) {
void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) {
buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle);
buf = taosDecodeFixedI8(buf, &setupRsp->outputType);
buf = taosDecodeFixedI32(buf, &setupRsp->outputLen);
buf = taosDecodeFixedI32(buf, &setupRsp->bytes);
buf = taosDecodeFixedI32(buf, &setupRsp->bufSize);
return (void *)buf;
}
@ -808,6 +808,26 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
}
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
SUdfColumnMeta* meta = &udfCol->colMeta;
SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1);
blockDataAppendColInfo(block, &colInfoData);
blockDataEnsureCapacity(block, udfCol->colData.numOfRows);
SColumnInfoData *col = bdGetColumnInfoData(block, 0);
for (int i = 0; i < udfCol->colData.numOfRows; ++i) {
if (udfColDataIsNull(udfCol, i)) {
colDataSetNULL(col, i);
} else {
char* data = udfColDataGetData(udfCol, i);
colDataSetVal(col, i, data, false);
}
}
block->info.rows = udfCol->colData.numOfRows;
return 0;
}
int32_t convertUdfColumnToDataBlock2(SUdfColumn *udfCol, SSDataBlock *block) {
block->info.rows = udfCol->colData.numOfRows;
block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
@ -899,9 +919,11 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
typedef struct SUdfAggRes {
int8_t finalResNum;
int8_t interResNum;
int32_t interResBufLen;
char *finalResBuf;
char *interResBuf;
} SUdfAggRes;
void onUdfcPipeClose(uv_handle_t *handle);
int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask);
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf);
@ -1054,9 +1076,9 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
fnError("udfc scalar function calculate error. no column data");
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
} else {
if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) {
if (session->outputType != output->columnData->info.type || session->bytes != output->columnData->info.bytes) {
fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)",
session->outputType, session->outputLen, output->columnData->info.type, output->columnData->info.bytes);
session->outputType, session->bytes, output->columnData->info.type, output->columnData->info.bytes);
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
@ -1084,11 +1106,11 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
}
SUdfcUvSession *session = (SUdfcUvSession *)handle;
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo);
int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize;
int32_t envSize = sizeof(SUdfAggRes) + session->bytes + session->bufSize;
memset(udfRes, 0, envSize);
udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
@ -1096,9 +1118,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
udfRes->interResNum = buf.numOfResult;
if (buf.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
udfRes->interResBufLen = buf.bufLen;
udfRes->interResNum = buf.numOfResult;
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName);
@ -1120,7 +1143,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SUdfcUvSession *session = handle;
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
SInputColumnInfoData *pInput = &pCtx->input;
int32_t numOfCols = pInput->numOfInputCols;
@ -1136,7 +1159,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows);
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum};
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
SUdfInterBuf newState = {0};
udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState);
@ -1144,17 +1167,17 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
fnError("udfAggProcess error. code: %d", udfCode);
newState.numOfResult = 0;
} else {
udfRes->interResNum = newState.numOfResult;
if (newState.bufLen <= session->bufSize) {
memcpy(udfRes->interResBuf, newState.buf, newState.bufLen);
udfRes->interResBufLen = newState.bufLen;
udfRes->interResNum = newState.numOfResult;
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", newState.bufLen, session->bufSize);
udfCode = TSDB_CODE_UDF_INVALID_BUFSIZE;
}
}
if (newState.numOfResult == 1 || state.numOfResult == 1) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
GET_RES_INFO(pCtx)->numOfRes = udfRes->interResNum;
blockDataDestroy(inputBlock);
@ -1177,22 +1200,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
SUdfcUvSession *session = handle;
SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes);
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen;
udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes;
SUdfInterBuf resultBuf = {0};
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum};
SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum};
int32_t udfCallCode = 0;
udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf);
if (udfCallCode != 0) {
fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode);
GET_RES_INFO(pCtx)->numOfRes = 0;
} else {
if (resultBuf.bufLen <= session->outputLen) {
memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen);
if (resultBuf.bufLen <= session->bytes) {
memcpy(udfRes->finalResBuf, resultBuf.buf, resultBuf.bufLen);
udfRes->finalResNum = resultBuf.numOfResult;
GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum;
} else {
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->outputLen);
fnError("udfc inter buf size %d is greater than function output size %d", resultBuf.bufLen, session->bytes);
GET_RES_INFO(pCtx)->numOfRes = 0;
udfCallCode = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
@ -1696,13 +1719,13 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle;
task->session->outputType = rsp->outputType;
task->session->outputLen = rsp->outputLen;
task->session->bytes = rsp->bytes;
task->session->bufSize = rsp->bufSize;
strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
if (task->errCode != 0) {
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
} else {
fnInfo("sucessfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session);
*funcHandle = task->session;
}
int32_t err = task->errCode;

View File

@ -30,6 +30,173 @@
#include "tmisce.h"
// clang-format on
#define UDFD_MAX_SCRIPT_PLUGINS 64
#define UDFD_MAX_SCRIPT_TYPE 1
#define UDFD_MAX_PLUGIN_FUNCS 9
typedef struct SUdfCPluginCtx {
uv_lib_t lib;
TUdfScalarProcFunc scalarProcFunc;
TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc;
TUdfAggMergeFunc aggMergeFunc;
TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc;
} SUdfCPluginCtx;
int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
int32_t udfdCPluginClose() { return 0; }
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
int32_t err = 0;
SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx));
err = uv_dlopen(udf->path, &udfCtx->lib);
if (err != 0) {
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
}
const char *udfName = udf->name;
char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *initSuffix = "_init";
strcpy(initFuncName, udfName);
strncat(initFuncName, initSuffix, strlen(initSuffix));
uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc));
char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *destroySuffix = "_destroy";
strcpy(destroyFuncName, udfName);
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc));
if (udf->funcType == UDF_FUNC_TYPE_SCALAR) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc));
} else if (udf->funcType == UDF_FUNC_TYPE_AGG) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc));
char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *startSuffix = "_start";
strncpy(startFuncName, processFuncName, sizeof(startFuncName));
strncat(startFuncName, startSuffix, strlen(startSuffix));
uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc));
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, sizeof(finishFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc));
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName));
strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc));
}
int32_t code = 0;
if (udfCtx->initFunc) {
// TODO: handle init call return error
code = (udfCtx->initFunc)();
if (code != 0) {
uv_dlclose(&udfCtx->lib);
taosMemoryFree(udfCtx);
return code;
}
}
*pUdfCtx = udfCtx;
return 0;
}
int32_t udfdCPluginUdfDestroy(void *udfCtx) {
SUdfCPluginCtx *ctx = udfCtx;
int32_t code = 0;
if (ctx->destroyFunc) {
code = (ctx->destroyFunc)();
}
uv_dlclose(&ctx->lib);
taosMemoryFree(ctx);
return code;
}
int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) {
SUdfCPluginCtx *ctx = udfCtx;
if (ctx->scalarProcFunc) {
return ctx->scalarProcFunc(block, resultCol);
} else {
fnError("udfd c plugin scalar proc not implemented");
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
}
}
int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) {
SUdfCPluginCtx *ctx = udfCtx;
if (ctx->aggStartFunc) {
return ctx->aggStartFunc(buf);
} else {
fnError("udfd c plugin aggregation start not implemented");
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
}
return 0;
}
int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) {
SUdfCPluginCtx *ctx = udfCtx;
if (ctx->aggProcFunc) {
return ctx->aggProcFunc(block, interBuf, newInterBuf);
} else {
fnError("udfd c plugin aggregation process not implemented");
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
}
}
int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
void *udfCtx) {
SUdfCPluginCtx *ctx = udfCtx;
if (ctx->aggMergeFunc) {
return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
} else {
fnError("udfd c plugin aggregation merge not implemented");
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
}
}
int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) {
SUdfCPluginCtx *ctx = udfCtx;
if (ctx->aggFinishFunc) {
return ctx->aggFinishFunc(buf, resultData);
} else {
fnError("udfd c plugin aggregation finish not implemented");
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
}
return 0;
}
// for c, the function pointer are filled directly and libloaded = true;
// for others, dlopen/dlsym to find function pointers
typedef struct SUdfScriptPlugin {
int8_t scriptType;
char libPath[PATH_MAX];
bool libLoaded;
uv_lib_t lib;
TScriptUdfScalarProcFunc udfScalarProcFunc;
TScriptUdfAggStartFunc udfAggStartFunc;
TScriptUdfAggProcessFunc udfAggProcFunc;
TScriptUdfAggMergeFunc udfAggMergeFunc;
TScriptUdfAggFinishFunc udfAggFinishFunc;
TScriptUdfInitFunc udfInitFunc;
TScriptUdfDestoryFunc udfDestroyFunc;
TScriptOpenFunc openFunc;
TScriptCloseFunc closeFunc;
} SUdfScriptPlugin;
typedef struct SUdfdContext {
uv_loop_t *loop;
uv_pipe_t ctrlPipe;
@ -37,11 +204,15 @@ typedef struct SUdfdContext {
char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_pipe_t listeningPipe;
void *clientRpc;
SCorEpSet mgmtEp;
void *clientRpc;
SCorEpSet mgmtEp;
uv_mutex_t udfsMutex;
SHashObj *udfsHash;
uv_mutex_t scriptPluginsMutex;
SUdfScriptPlugin *scriptPlugins[UDFD_MAX_SCRIPT_PLUGINS];
SArray *residentFuncs;
bool printVersion;
@ -70,16 +241,11 @@ typedef struct SUvUdfWork {
struct SUvUdfWork *pWorkNext;
} SUvUdfWork;
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState;
typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY} EUdfState;
typedef struct SUdf {
int32_t refCount;
EUdfState state;
uv_mutex_t lock;
uv_cond_t condReady;
bool resident;
char name[TSDB_FUNC_NAME_LEN + 1];
char name[TSDB_FUNC_NAME_LEN + 1];
int8_t funcType;
int8_t scriptType;
int8_t outputType;
@ -88,17 +254,14 @@ typedef struct SUdf {
char path[PATH_MAX];
uv_lib_t lib;
int32_t refCount;
EUdfState state;
uv_mutex_t lock;
uv_cond_t condReady;
bool resident;
TUdfScalarProcFunc scalarProcFunc;
TUdfAggStartFunc aggStartFunc;
TUdfAggProcessFunc aggProcFunc;
TUdfAggFinishFunc aggFinishFunc;
TUdfAggMergeFunc aggMergeFunc;
TUdfInitFunc initFunc;
TUdfDestroyFunc destroyFunc;
SUdfScriptPlugin *scriptPlugin;
void *scriptUdfCtx;
} SUdf;
// TODO: add private udf structure.
@ -121,7 +284,6 @@ typedef struct SUdfdRpcSendRecvInfo {
static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
static int32_t udfdConnectToMnode();
static int32_t udfdLoadUdf(char *udfName, SUdf *udf);
static bool udfdRpcRfp(int32_t code, tmsg_t msgType);
static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t udfdOpenClientRpc();
@ -155,6 +317,155 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg);
static int32_t udfdRun();
static void udfdConnectMnodeThreadFunc(void *args);
void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
plugin->openFunc = udfdCPluginOpen;
plugin->closeFunc = udfdCPluginClose;
plugin->udfInitFunc = udfdCPluginUdfInit;
plugin->udfDestroyFunc = udfdCPluginUdfDestroy;
plugin->udfScalarProcFunc = udfdCPluginUdfScalarProc;
plugin->udfAggStartFunc = udfdCPluginUdfAggStart;
plugin->udfAggProcFunc = udfdCPluginUdfAggProc;
plugin->udfAggMergeFunc = udfdCPluginUdfAggMerge;
plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish;
SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}};
plugin->openFunc(items, 1);
return;
}
int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) {
int err = uv_dlopen(libPath, pLib);
if (err != 0) {
fnError("can not load library %s. error: %s", libPath, uv_strerror(err));
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
}
for (int i = 0; i < numOfFuncs; ++i) {
err = uv_dlsym(pLib, funcName[i], func[i]);
if (err != 0) {
fnError("load library function failed. lib %s function %s", libPath, funcName[i]);
}
}
return 0;
}
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
// todo: windows support
sprintf(plugin->libPath, "%s", "libtaospyudf.so");
plugin->libLoaded = false;
const char *funcName[UDFD_MAX_PLUGIN_FUNCS] = {"pyOpen", "pyClose", "pyUdfInit",
"pyUdfDestroy", "pyUdfScalarProc", "pyUdfAggStart",
"pyUdfAggFinish", "pyUdfAggProc", "pyUdfAggMerge"};
void **funcs[UDFD_MAX_PLUGIN_FUNCS] = {
(void **)&plugin->openFunc, (void **)&plugin->closeFunc, (void **)&plugin->udfInitFunc,
(void **)&plugin->udfDestroyFunc, (void **)&plugin->udfScalarProcFunc, (void **)&plugin->udfAggStartFunc,
(void **)&plugin->udfAggFinishFunc, (void **)&plugin->udfAggProcFunc, (void **)&plugin->udfAggMergeFunc};
int32_t err = udfdLoadSharedLib(plugin->libPath, &plugin->lib, funcName, funcs, UDFD_MAX_PLUGIN_FUNCS);
if (err != 0) {
fnError("can not load python plugin. lib path %s", plugin->libPath);
return err;
}
if (plugin->openFunc) {
int16_t lenPythonPath = strlen(tsUdfdLdLibPath) + strlen(tsTempDir) + 1 + 1; // tsTempDir:tsUdfdLdLibPath
char *pythonPath = taosMemoryMalloc(lenPythonPath);
#ifdef WINDOWS
snprintf(pythonPath, lenPythonPath, "%s;%s", tsTempDir, tsUdfdLdLibPath);
#else
snprintf(pythonPath, lenPythonPath, "%s:%s", tsTempDir, tsUdfdLdLibPath);
#endif
SScriptUdfEnvItem items[] = {{"PYTHONPATH", pythonPath}, {"LOGDIR", tsLogDir}};
err = plugin->openFunc(items, 2);
taosMemoryFree(pythonPath);
}
if (err != 0) {
fnError("udf script python plugin open func failed. error: %d", err);
uv_dlclose(&plugin->lib);
return err;
}
plugin->libLoaded = true;
return 0;
}
void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
if (plugin->closeFunc) {
plugin->closeFunc();
}
plugin->openFunc = NULL;
plugin->closeFunc = NULL;
plugin->udfInitFunc = NULL;
plugin->udfDestroyFunc = NULL;
plugin->udfScalarProcFunc = NULL;
plugin->udfAggStartFunc = NULL;
plugin->udfAggProcFunc = NULL;
plugin->udfAggMergeFunc = NULL;
plugin->udfAggFinishFunc = NULL;
return;
}
void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
if (plugin->closeFunc) {
plugin->closeFunc();
}
uv_dlclose(&plugin->lib);
if (plugin->libLoaded) {
plugin->libLoaded = false;
}
plugin->openFunc = NULL;
plugin->closeFunc = NULL;
plugin->udfInitFunc = NULL;
plugin->udfDestroyFunc = NULL;
plugin->udfScalarProcFunc = NULL;
plugin->udfAggStartFunc = NULL;
plugin->udfAggProcFunc = NULL;
plugin->udfAggMergeFunc = NULL;
plugin->udfAggFinishFunc = NULL;
}
int32_t udfdInitScriptPlugin(int8_t scriptType) {
SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin));
switch (scriptType) {
case TSDB_FUNC_SCRIPT_BIN_LIB:
udfdInitializeCPlugin(plugin);
break;
case TSDB_FUNC_SCRIPT_PYTHON: {
int32_t err = udfdInitializePythonPlugin(plugin);
if (err != 0) {
taosMemoryFree(plugin);
return err;
}
break;
}
default:
fnError("udf script type %d not supported", scriptType);
taosMemoryFree(plugin);
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
}
global.scriptPlugins[scriptType] = plugin;
return TSDB_CODE_SUCCESS;
}
void udfdDeinitScriptPlugins() {
SUdfScriptPlugin *plugin = NULL;
plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_PYTHON];
if (plugin != NULL) {
udfdDeinitPythonPlugin(plugin);
taosMemoryFree(plugin);
}
plugin = global.scriptPlugins[TSDB_FUNC_SCRIPT_BIN_LIB];
if (plugin != NULL) {
udfdDeinitCPlugin(plugin);
taosMemoryFree(plugin);
}
return;
}
void udfdProcessRequest(uv_work_t *req) {
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
SUdfRequest request = {0};
@ -180,14 +491,76 @@ void udfdProcessRequest(uv_work_t *req) {
}
}
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS;
SUdf *udf = NULL;
void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
udfInfo->bufSize = udf->bufSize;
if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
udfInfo->funcType = UDF_FUNC_TYPE_AGG;
} else if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
udfInfo->funcType = UDF_FUNC_TYPE_SCALAR;
}
udfInfo->name = udf->name;
udfInfo->outputLen = udf->outputLen;
udfInfo->outputType = udf->outputType;
udfInfo->path = udf->path;
udfInfo->scriptType = udf->scriptType;
}
int32_t udfdRenameUdfFile(SUdf *udf) {
char newPath[PATH_MAX];
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
snprintf(newPath, PATH_MAX, "%s/lib%s.so", tsTempDir, udf->name);
taosRenameFile(udf->path, newPath);
sprintf(udf->path, "%s", newPath);
} else if (udf->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
snprintf(newPath, PATH_MAX, "%s/%s.py", tsTempDir, udf->name);
taosRenameFile(udf->path, newPath);
sprintf(udf->path, "%s", newPath);
} else {
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
}
return 0;
}
int32_t udfdInitUdf(char *udfName, SUdf *udf) {
int32_t err = 0;
err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf);
if (err != 0) {
fnError("can not retrieve udf from mnode. udf name %s", udfName);
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
}
if (udf->scriptType > UDFD_MAX_SCRIPT_TYPE) {
fnError("udf name %s script type %d not supported", udfName, udf->scriptType);
return TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED;
}
uv_mutex_lock(&global.scriptPluginsMutex);
SUdfScriptPlugin *scriptPlugin = global.scriptPlugins[udf->scriptType];
if (scriptPlugin == NULL) {
err = udfdInitScriptPlugin(udf->scriptType);
if (err != 0) {
uv_mutex_unlock(&global.scriptPluginsMutex);
return err;
}
}
uv_mutex_unlock(&global.scriptPluginsMutex);
udf->scriptPlugin = global.scriptPlugins[udf->scriptType];
udfdRenameUdfFile(udf);
SScriptUdfInfo info = {0};
convertUdf2UdfInfo(udf, &info);
err = udf->scriptPlugin->udfInitFunc(&info, &udf->scriptUdfCtx);
if (err != 0) {
fnError("udf name %s init failed. error %d", udfName, err);
return err;
}
return 0;
}
SUdf *udfdGetOrCreateUdf(const char *udfName) {
SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex);
SUdf **udfInHash = taosHashGet(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName));
SUdf **udfInHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
if (udfInHash) {
++(*udfInHash)->refCount;
udf = *udfInHash;
@ -195,36 +568,51 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
} else {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1;
strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN);
udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady);
udf = udfNew;
SUdf **pUdf = &udf;
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES);
uv_mutex_unlock(&global.udfsMutex);
}
uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING;
code = udfdLoadUdf(setup->udfName, udf);
if (udf->initFunc) {
udf->initFunc();
}
udf->resident = false;
for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) {
char *funcName = taosArrayGet(global.residentFuncs, i);
if (strcmp(setup->udfName, funcName) == 0) {
if (strcmp(udfName, funcName) == 0) {
udf->resident = true;
break;
}
}
udf->state = UDF_STATE_READY;
SUdf **pUdf = &udf;
taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES);
uv_mutex_unlock(&global.udfsMutex);
}
return udf;
}
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
// TODO: tracable id from client. connect, setup, call, teardown
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
SUdfSetupRequest *setup = &request->setup;
int32_t code = TSDB_CODE_SUCCESS;
SUdf *udf = NULL;
udf = udfdGetOrCreateUdf(setup->udfName);
uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING;
code = udfdInitUdf(setup->udfName, udf);
if (code == 0) {
udf->state = UDF_STATE_READY;
} else {
udf->state = UDF_STATE_INIT;
}
uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock);
} else {
while (udf->state != UDF_STATE_READY) {
while (udf->state == UDF_STATE_LOADING) {
uv_cond_wait(&udf->condReady, &udf->lock);
}
uv_mutex_unlock(&udf->lock);
@ -235,10 +623,10 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfResponse rsp;
rsp.seqNum = request->seqNum;
rsp.type = request->type;
rsp.code = code;
rsp.code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
rsp.setupRsp.udfHandle = (int64_t)(handle);
rsp.setupRsp.outputType = udf->outputType;
rsp.setupRsp.outputLen = udf->outputLen;
rsp.setupRsp.bytes = udf->outputLen;
rsp.setupRsp.bufSize = udf->bufSize;
int32_t len = encodeUdfResponse(NULL, &rsp);
@ -267,10 +655,13 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
switch (call->callType) {
case TSDB_UDF_CALL_SCALA_PROC: {
SUdfColumn output = {0};
output.colMeta.bytes = udf->outputLen;
output.colMeta.type = udf->outputType;
output.colMeta.precision = 0;
output.colMeta.scale = 0;
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
code = udf->scalarProcFunc(&input, &output);
code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
freeUdfDataDataBlock(&input);
convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
freeUdfColumn(&output);
@ -278,7 +669,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
case TSDB_UDF_CALL_AGG_INIT: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
udf->aggStartFunc(&outBuf);
code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx);
subRsp->resultBuf = outBuf;
break;
}
@ -286,7 +677,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggProcFunc(&input, &call->interBuf, &outBuf);
code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx);
freeUdfInterBuf(&call->interBuf);
freeUdfDataDataBlock(&input);
subRsp->resultBuf = outBuf;
@ -295,7 +686,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
case TSDB_UDF_CALL_AGG_MERGE: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf);
code = udf->scriptPlugin->udfAggMergeFunc(&call->interBuf, &call->interBuf2, &outBuf, udf->scriptUdfCtx);
freeUdfInterBuf(&call->interBuf);
freeUdfInterBuf(&call->interBuf2);
subRsp->resultBuf = outBuf;
@ -304,7 +695,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
}
case TSDB_UDF_CALL_AGG_FIN: {
SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0};
code = udf->aggFinishFunc(&call->interBuf, &outBuf);
code = udf->scriptPlugin->udfAggFinishFunc(&call->interBuf, &outBuf, udf->scriptUdfCtx);
freeUdfInterBuf(&call->interBuf);
subRsp->resultBuf = outBuf;
break;
@ -315,7 +706,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
rsp->seqNum = request->seqNum;
rsp->type = request->type;
rsp->code = code;
rsp->code = (code != 0) ? TSDB_CODE_UDF_FUNC_EXEC_FAILURE : 0;
subRsp->callType = call->callType;
int32_t len = encodeUdfResponse(NULL, rsp);
@ -374,10 +765,8 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
if (unloadUdf) {
uv_cond_destroy(&udf->condReady);
uv_mutex_destroy(&udf->lock);
if (udf->destroyFunc) {
(udf->destroyFunc)();
}
uv_dlclose(&udf->lib);
code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
fnDebug("udfd destroy function returns %d", code);
taosMemoryFree(udf);
}
taosMemoryFree(handle);
@ -440,7 +829,8 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
goto _return;
}
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf *udf = msgInfo->param;
// SUdf *udf = msgInfo->param;
SUdf *udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->outputType;
@ -456,11 +846,10 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
char path[PATH_MAX] = {0};
#ifdef WINDOWS
snprintf(path, sizeof(path), "%s%s.dll", tsTempDir, pFuncInfo->name);
snprintf(path, sizeof(path), "%s%s", tsTempDir, pFuncInfo->name);
#else
snprintf(path, sizeof(path), "%s/lib%s.so", tsTempDir, pFuncInfo->name);
snprintf(path, sizeof(path), "%s/%s", tsTempDir, pFuncInfo->name);
#endif
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
if (file == NULL) {
fnError("udfd write udf shared library: %s failed, error: %d %s", path, errno, strerror(errno));
@ -550,60 +939,6 @@ int32_t udfdConnectToMnode() {
return code;
}
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strncpy(udf->name, udfName, TSDB_FUNC_NAME_LEN);
int32_t err = 0;
err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
if (err != 0) {
fnError("can not retrieve udf from mnode. udf name %s", udfName);
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
}
err = uv_dlopen(udf->path, &udf->lib);
if (err != 0) {
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
return TSDB_CODE_UDF_LOAD_UDF_FAILURE;
}
char initFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *initSuffix = "_init";
strcpy(initFuncName, udfName);
strncat(initFuncName, initSuffix, strlen(initSuffix));
uv_dlsym(&udf->lib, initFuncName, (void **)(&udf->initFunc));
char destroyFuncName[TSDB_FUNC_NAME_LEN + 5] = {0};
char *destroySuffix = "_destroy";
strcpy(destroyFuncName, udfName);
strncat(destroyFuncName, destroySuffix, strlen(destroySuffix));
uv_dlsym(&udf->lib, destroyFuncName, (void **)(&udf->destroyFunc));
if (udf->funcType == TSDB_FUNC_TYPE_SCALAR) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc));
} else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
strcpy(processFuncName, udfName);
uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->aggProcFunc));
char startFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *startSuffix = "_start";
strncpy(startFuncName, processFuncName, sizeof(startFuncName));
strncat(startFuncName, startSuffix, strlen(startSuffix));
uv_dlsym(&udf->lib, startFuncName, (void **)(&udf->aggStartFunc));
char finishFuncName[TSDB_FUNC_NAME_LEN + 7] = {0};
char *finishSuffix = "_finish";
strncpy(finishFuncName, processFuncName, sizeof(finishFuncName));
strncat(finishFuncName, finishSuffix, strlen(finishSuffix));
uv_dlsym(&udf->lib, finishFuncName, (void **)(&udf->aggFinishFunc));
char mergeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *mergeSuffix = "_merge";
strncpy(mergeFuncName, processFuncName, sizeof(mergeFuncName));
strncat(mergeFuncName, mergeSuffix, strlen(mergeSuffix));
uv_dlsym(&udf->lib, mergeFuncName, (void **)(&udf->aggMergeFunc));
}
return 0;
}
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
@ -965,6 +1300,8 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) {
}
static int32_t udfdRun() {
uv_mutex_init(&global.scriptPluginsMutex);
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex);
@ -1022,12 +1359,10 @@ int32_t udfdDeinitResidentFuncs() {
SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName));
if (udfInHash) {
SUdf *udf = *udfInHash;
if (udf->destroyFunc) {
(udf->destroyFunc)();
}
uv_dlclose(&udf->lib);
taosMemoryFree(udf);
int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx);
fnDebug("udfd destroy function returns %d", code);
taosHashRemove(global.udfsHash, funcName, strlen(funcName));
taosMemoryFree(udf);
}
}
taosArrayDestroy(global.residentFuncs);
@ -1088,6 +1423,9 @@ int main(int argc, char *argv[]) {
udfdCloseClientRpc();
udfdDeinitResidentFuncs();
udfdDeinitScriptPlugins();
udfdCleanup();
return 0;
}

View File

@ -14,15 +14,8 @@ DLL_EXPORT int32_t udf1_init() { return 0; }
DLL_EXPORT int32_t udf1_destroy() { return 0; }
DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) {
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
for (int32_t i = 0; i < block->numOfRows; ++i) {
int j = 0;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
@ -42,5 +35,6 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) {
#ifdef WINDOWS
Sleep(1);
#endif
resultData->numOfRows = block->numOfRows;
return 0;
}
}

View File

@ -12,12 +12,15 @@ DLL_EXPORT int32_t udf2_destroy() { return 0; }
DLL_EXPORT int32_t udf2_start(SUdfInterBuf* buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 0;
buf->numOfResult = 1;
return 0;
}
DLL_EXPORT int32_t udf2(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) {
double sumSquares = *(double*)interBuf->buf;
double sumSquares = 0;
if (interBuf->numOfResult == 1) {
sumSquares = *(double*)interBuf->buf;
}
int8_t numNotNull = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];

View File

@ -6399,6 +6399,7 @@ static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionS
req.funcType = pStmt->isAgg ? TSDB_FUNC_TYPE_AGGREGATE : TSDB_FUNC_TYPE_SCALAR;
req.scriptType = pStmt->language;
req.outputType = pStmt->outputDt.type;
pStmt->outputDt.bytes = calcTypeBytes(pStmt->outputDt);
req.outputLen = pStmt->outputDt.bytes;
req.bufSize = pStmt->bufSize;
int32_t code = readFromFile(pStmt->libraryPath, &req.codeLen, &req.pCode);

View File

@ -584,6 +584,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_INPUT, "udf invalid functio
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_NO_FUNC_HANDLE, "udf no function handle")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported")
TAOS_DEFINE_ERROR(TSDB_CODE_UDF_FUNC_EXEC_FAILURE, "udf function execution failure")
//schemaless
TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")