param valid: udf
This commit is contained in:
parent
d36b6fd214
commit
1c01bd801b
|
@ -61,6 +61,28 @@ extern "C" {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define TAOS_UDF_CHECK_PTR_RCODE(...) \
|
||||
do { \
|
||||
const void *ptrs[] = {__VA_ARGS__}; \
|
||||
for (int i = 0; i < sizeof(ptrs) / sizeof(ptrs[0]); ++i) { \
|
||||
if (ptrs[i] == NULL) { \
|
||||
fnError("udfd %dth parameter invalid, NULL PTR.line:%d", i, __LINE__); \
|
||||
return TSDB_CODE_INVALID_PARA; \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define TAOS_UDF_CHECK_PTR_RVOID(...) \
|
||||
do { \
|
||||
const void *ptrs[] = {__VA_ARGS__}; \
|
||||
for (int i = 0; i < sizeof(ptrs) / sizeof(ptrs[0]); ++i) { \
|
||||
if (ptrs[i] == NULL) { \
|
||||
fnError("udfd %dth parameter invalid, NULL PTR.line:%d", i, __LINE__); \
|
||||
return; \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
// low level APIs
|
||||
/**
|
||||
|
|
|
@ -62,6 +62,7 @@ static void udfUdfdStopAsyncCb(uv_async_t *async);
|
|||
static void udfWatchUdfd(void *args);
|
||||
|
||||
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(process);
|
||||
fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
|
||||
SUdfdData *pData = process->data;
|
||||
if(pData == NULL) {
|
||||
|
@ -81,6 +82,7 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal)
|
|||
|
||||
static int32_t udfSpawnUdfd(SUdfdData *pData) {
|
||||
fnInfo("start to init udfd");
|
||||
TAOS_UDF_CHECK_PTR_RCODE(pData);
|
||||
|
||||
int32_t err = 0;
|
||||
uv_process_options_t options = {0};
|
||||
|
@ -271,17 +273,20 @@ _OVER:
|
|||
}
|
||||
|
||||
static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(handle);
|
||||
if (!uv_is_closing(handle)) {
|
||||
uv_close(handle, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
static void udfUdfdStopAsyncCb(uv_async_t *async) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(async);
|
||||
SUdfdData *pData = async->data;
|
||||
uv_stop(&pData->loop);
|
||||
}
|
||||
|
||||
static void udfWatchUdfd(void *args) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(args);
|
||||
SUdfdData *pData = args;
|
||||
TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop));
|
||||
TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb));
|
||||
|
@ -877,6 +882,7 @@ void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) {
|
|||
}
|
||||
|
||||
void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(data, meta);
|
||||
if (IS_VAR_DATA_TYPE(meta->type)) {
|
||||
taosMemoryFree(data->varLenCol.varOffsets);
|
||||
data->varLenCol.varOffsets = NULL;
|
||||
|
@ -890,9 +896,13 @@ void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) {
|
|||
}
|
||||
}
|
||||
|
||||
void freeUdfColumn(SUdfColumn *col) { freeUdfColumnData(&col->colData, &col->colMeta); }
|
||||
void freeUdfColumn(SUdfColumn *col) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(col);
|
||||
freeUdfColumnData(&col->colData, &col->colMeta);
|
||||
}
|
||||
|
||||
void freeUdfDataDataBlock(SUdfDataBlock *block) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(block);
|
||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||
freeUdfColumn(block->udfCols[i]);
|
||||
taosMemoryFree(block->udfCols[i]);
|
||||
|
@ -903,11 +913,17 @@ void freeUdfDataDataBlock(SUdfDataBlock *block) {
|
|||
}
|
||||
|
||||
void freeUdfInterBuf(SUdfInterBuf *buf) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(buf);
|
||||
taosMemoryFree(buf->buf);
|
||||
buf->buf = NULL;
|
||||
}
|
||||
|
||||
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(block, udfBlock);
|
||||
int32_t code = blockDataCheck(block);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
udfBlock->numOfRows = block->info.rows;
|
||||
udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock);
|
||||
udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *));
|
||||
|
@ -977,6 +993,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
|
|||
}
|
||||
|
||||
int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(udfCol, block);
|
||||
int32_t code = 0, lino = 0;
|
||||
SUdfColumnMeta *meta = &udfCol->colMeta;
|
||||
|
||||
|
@ -1010,6 +1027,7 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(input, output);
|
||||
int32_t code = 0, lino = 0;
|
||||
int32_t numOfRows = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
|
@ -1057,6 +1075,7 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(input, output);
|
||||
if (taosArrayGetSize(input->pDataBlock) != 1) {
|
||||
fnError("scalar function only support one column");
|
||||
return 0;
|
||||
|
@ -1135,6 +1154,7 @@ int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) {
|
|||
}
|
||||
|
||||
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(udfName, pHandle);
|
||||
int32_t code = 0, line = 0;
|
||||
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
|
||||
SUdfcFuncStub key = {0};
|
||||
|
@ -1193,6 +1213,7 @@ _exit:
|
|||
}
|
||||
|
||||
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(udfName);
|
||||
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
|
||||
SUdfcFuncStub key = {0};
|
||||
tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
|
||||
|
@ -1295,6 +1316,7 @@ int32_t cleanUpUdfs() {
|
|||
}
|
||||
|
||||
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(udfName, input, output);
|
||||
UdfcFuncHandle handle = NULL;
|
||||
int32_t code = acquireUdfFuncHandle(udfName, &handle);
|
||||
if (code != 0) {
|
||||
|
@ -1324,6 +1346,10 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
|
|||
}
|
||||
|
||||
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
|
||||
if (pFunc == NULL || pEnv == NULL) {
|
||||
fnError("udfAggGetEnv: invalid input lint: %d", __LINE__);
|
||||
return false;
|
||||
}
|
||||
if (fmIsScalarFunc(pFunc->funcId)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1332,6 +1358,7 @@ bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) {
|
|||
}
|
||||
|
||||
int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(pCtx, pResultCellInfo);
|
||||
if (pResultCellInfo->initialized) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1373,6 +1400,7 @@ int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes
|
|||
}
|
||||
|
||||
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(pCtx);
|
||||
int32_t udfCode = 0;
|
||||
UdfcFuncHandle handle = 0;
|
||||
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
|
||||
|
@ -1444,6 +1472,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(pCtx, pBlock);
|
||||
int32_t udfCode = 0;
|
||||
UdfcFuncHandle handle = 0;
|
||||
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
|
||||
|
|
|
@ -55,6 +55,7 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; }
|
|||
int32_t udfdCPluginClose() { return 0; }
|
||||
|
||||
int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
|
||||
char initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
|
||||
char *initSuffix = "_init";
|
||||
snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix);
|
||||
|
@ -68,6 +69,7 @@ int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const cha
|
|||
}
|
||||
|
||||
int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName);
|
||||
char processFuncName[TSDB_FUNC_NAME_LEN] = {0};
|
||||
snprintf(processFuncName, sizeof(processFuncName), "%s", udfName);
|
||||
TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)));
|
||||
|
@ -93,6 +95,7 @@ int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfNa
|
|||
}
|
||||
|
||||
int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(udf, pUdfCtx);
|
||||
int32_t err = 0;
|
||||
SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx));
|
||||
if (NULL == udfCtx) {
|
||||
|
@ -146,6 +149,7 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t udfdCPluginUdfDestroy(void *udfCtx) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(udfCtx);
|
||||
SUdfCPluginCtx *ctx = udfCtx;
|
||||
int32_t code = 0;
|
||||
if (ctx->destroyFunc) {
|
||||
|
@ -157,6 +161,7 @@ int32_t udfdCPluginUdfDestroy(void *udfCtx) {
|
|||
}
|
||||
|
||||
int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(block, resultCol, udfCtx);
|
||||
SUdfCPluginCtx *ctx = udfCtx;
|
||||
if (ctx->scalarProcFunc) {
|
||||
return ctx->scalarProcFunc(block, resultCol);
|
||||
|
@ -167,6 +172,7 @@ int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, vo
|
|||
}
|
||||
|
||||
int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(buf, udfCtx);
|
||||
SUdfCPluginCtx *ctx = udfCtx;
|
||||
if (ctx->aggStartFunc) {
|
||||
return ctx->aggStartFunc(buf);
|
||||
|
@ -178,6 +184,7 @@ int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) {
|
|||
}
|
||||
|
||||
int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(block, interBuf, newInterBuf, udfCtx);
|
||||
SUdfCPluginCtx *ctx = udfCtx;
|
||||
if (ctx->aggProcFunc) {
|
||||
return ctx->aggProcFunc(block, interBuf, newInterBuf);
|
||||
|
@ -189,6 +196,7 @@ int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdf
|
|||
|
||||
int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf,
|
||||
void *udfCtx) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx);
|
||||
SUdfCPluginCtx *ctx = udfCtx;
|
||||
if (ctx->aggMergeFunc) {
|
||||
return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf);
|
||||
|
@ -199,6 +207,7 @@ int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2,
|
|||
}
|
||||
|
||||
int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx);
|
||||
SUdfCPluginCtx *ctx = udfCtx;
|
||||
if (ctx->aggFinishFunc) {
|
||||
return ctx->aggFinishFunc(buf, resultData);
|
||||
|
@ -360,6 +369,7 @@ int32_t udfdNewUdf(SUdf **pUdf, const char *udfName);
|
|||
void udfdGetFuncBodyPath(const SUdf *udf, char *path);
|
||||
|
||||
int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(plugin);
|
||||
plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB;
|
||||
plugin->openFunc = udfdCPluginOpen;
|
||||
plugin->closeFunc = udfdCPluginClose;
|
||||
|
@ -378,6 +388,7 @@ int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) {
|
|||
}
|
||||
|
||||
int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(libPath, pLib, funcName, func);
|
||||
int err = uv_dlopen(libPath, pLib);
|
||||
if (err != 0) {
|
||||
fnError("can not load library %s. error: %s", libPath, uv_strerror(err));
|
||||
|
@ -394,6 +405,7 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[],
|
|||
}
|
||||
|
||||
int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(plugin);
|
||||
plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON;
|
||||
// todo: windows support
|
||||
snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so");
|
||||
|
@ -439,6 +451,10 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) {
|
|||
}
|
||||
|
||||
void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
|
||||
if (plugin == NULL) {
|
||||
fnError("udf script c plugin is NULL");
|
||||
return;
|
||||
}
|
||||
if (plugin->closeFunc) {
|
||||
if (plugin->closeFunc() != 0) {
|
||||
fnError("udf script c plugin close func failed.line:%d", __LINE__);
|
||||
|
@ -457,6 +473,10 @@ void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) {
|
|||
}
|
||||
|
||||
void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) {
|
||||
if (plugin == NULL) {
|
||||
fnError("udf script c plugin is NULL");
|
||||
return;
|
||||
}
|
||||
if (plugin->closeFunc) {
|
||||
if (plugin->closeFunc() != 0) {
|
||||
fnError("udf script python plugin close func failed.line:%d", __LINE__);
|
||||
|
@ -528,7 +548,15 @@ void udfdDeinitScriptPlugins() {
|
|||
}
|
||||
|
||||
void udfdProcessRequest(uv_work_t *req) {
|
||||
if (req == NULL) {
|
||||
fnError("udf request is NULL");
|
||||
return;
|
||||
}
|
||||
SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data);
|
||||
if (uvUdf == NULL) {
|
||||
fnError("udf work is NULL");
|
||||
return;
|
||||
}
|
||||
SUdfRequest request = {0};
|
||||
if(decodeUdfRequest(uvUdf->input.base, &request) == NULL)
|
||||
{
|
||||
|
@ -557,7 +585,7 @@ void udfdProcessRequest(uv_work_t *req) {
|
|||
}
|
||||
}
|
||||
|
||||
void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
|
||||
static void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
|
||||
udfInfo->bufSize = udf->bufSize;
|
||||
if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) {
|
||||
udfInfo->funcType = UDF_FUNC_TYPE_AGG;
|
||||
|
@ -573,7 +601,8 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) {
|
|||
udfInfo->scriptType = udf->scriptType;
|
||||
}
|
||||
|
||||
int32_t udfdInitUdf(char *udfName, SUdf *udf) {
|
||||
static int32_t udfdInitUdf(char *udfName, SUdf *udf) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(udfName, udf);
|
||||
int32_t err = 0;
|
||||
err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf);
|
||||
if (err != 0) {
|
||||
|
@ -611,6 +640,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) {
|
|||
}
|
||||
|
||||
int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(pUdf, udfName);
|
||||
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
|
||||
if (NULL == udfNew) {
|
||||
return terrno;
|
||||
|
@ -654,6 +684,7 @@ void udfdFreeUdf(void *pData) {
|
|||
}
|
||||
|
||||
int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(ppUdf, udfName);
|
||||
uv_mutex_lock(&global.udfsMutex);
|
||||
SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
|
||||
int64_t currTime = taosGetTimestampMs();
|
||||
|
@ -693,6 +724,7 @@ int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) {
|
|||
}
|
||||
|
||||
void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
|
||||
// TODO: tracable id from client. connect, setup, call, teardown
|
||||
fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName);
|
||||
|
||||
|
@ -764,7 +796,30 @@ _send:
|
|||
return;
|
||||
}
|
||||
|
||||
static int32_t checkUDFScalaResult(SSDataBlock *block, SUdfColumn *output) {
|
||||
if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (output->colData.numOfRows != block->info.rows) {
|
||||
fnError("udf scala result num of rows %d not equal to input rows %" PRId64, output->colData.numOfRows, block->info.rows);
|
||||
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t checkUDFAggResult(SSDataBlock *block, SUdfInterBuf *output) {
|
||||
if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (output->numOfResult != 1 && output->numOfResult != 0) {
|
||||
fnError("udf agg result num of rows %d not equal to 1", output->numOfResult);
|
||||
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
|
||||
SUdfCallRequest *call = &request->call;
|
||||
fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle,
|
||||
request->seqNum);
|
||||
|
@ -787,6 +842,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
|||
code = convertDataBlockToUdfDataBlock(&call->block, &input);
|
||||
if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
|
||||
freeUdfDataDataBlock(&input);
|
||||
if (code == TSDB_CODE_SUCCESS) code = checkUDFScalaResult(&call->block, &output);
|
||||
if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData);
|
||||
}
|
||||
freeUdfColumn(&output);
|
||||
|
@ -809,6 +865,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
|||
if (outBuf.buf != NULL) {
|
||||
code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx);
|
||||
freeUdfInterBuf(&call->interBuf);
|
||||
if (code == TSDB_CODE_SUCCESS) code = checkUDFAggResult(&call->block, &outBuf);
|
||||
subRsp->resultBuf = outBuf;
|
||||
} else {
|
||||
code = terrno;
|
||||
|
@ -905,6 +962,7 @@ _exit:
|
|||
}
|
||||
|
||||
void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request);
|
||||
SUdfTeardownRequest *teardown = &request->teardown;
|
||||
fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle);
|
||||
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
|
||||
|
@ -964,6 +1022,7 @@ _send:
|
|||
}
|
||||
|
||||
void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(udf, path);
|
||||
if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
|
||||
#ifdef WINDOWS
|
||||
snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime);
|
||||
|
@ -987,6 +1046,7 @@ void udfdGetFuncBodyPath(const SUdf *udf, char *path) {
|
|||
}
|
||||
|
||||
int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(pFuncInfo, udf);
|
||||
if (!osDataSpaceAvailable()) {
|
||||
terrno = TSDB_CODE_NO_DISKSPACE;
|
||||
fnError("udfd create shared library failed since %s", terrstr());
|
||||
|
@ -1022,6 +1082,7 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) {
|
|||
}
|
||||
|
||||
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(parent, pMsg);
|
||||
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle;
|
||||
|
||||
if (pEpSet) {
|
||||
|
@ -1093,6 +1154,7 @@ _return:
|
|||
}
|
||||
|
||||
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
|
||||
TAOS_UDF_CHECK_PTR_RCODE(clientRpc, udfName, udf);
|
||||
SRetrieveFuncReq retrieveReq = {0};
|
||||
retrieveReq.numOfFuncs = 1;
|
||||
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
|
||||
|
@ -1233,6 +1295,7 @@ void udfdCloseClientRpc() {
|
|||
}
|
||||
|
||||
void udfdOnWrite(uv_write_t *req, int status) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(req);
|
||||
SUvUdfWork *work = (SUvUdfWork *)req->data;
|
||||
if (status < 0) {
|
||||
fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status));
|
||||
|
@ -1254,6 +1317,7 @@ void udfdOnWrite(uv_write_t *req, int status) {
|
|||
}
|
||||
|
||||
void udfdSendResponse(uv_work_t *work, int status) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(work);
|
||||
SUvUdfWork *udfWork = (SUvUdfWork *)(work->data);
|
||||
|
||||
if (udfWork->conn != NULL) {
|
||||
|
@ -1274,6 +1338,7 @@ void udfdSendResponse(uv_work_t *work, int status) {
|
|||
}
|
||||
|
||||
void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(handle, buf);
|
||||
SUdfdUvConn *ctx = handle->data;
|
||||
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
|
||||
if (ctx->inputCap == 0) {
|
||||
|
@ -1307,6 +1372,10 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
|
|||
}
|
||||
|
||||
bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
|
||||
if (pipe == NULL) {
|
||||
fnError("udfd pipe is NULL, LINE:%d", __LINE__);
|
||||
return false;
|
||||
}
|
||||
if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) {
|
||||
pipe->inputTotal = *(int32_t *)(pipe->inputBuf);
|
||||
}
|
||||
|
@ -1318,6 +1387,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
|
|||
}
|
||||
|
||||
void udfdHandleRequest(SUdfdUvConn *conn) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(conn);
|
||||
char *inputBuf = conn->inputBuf;
|
||||
int32_t inputLen = conn->inputLen;
|
||||
|
||||
|
@ -1350,6 +1420,7 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
|
|||
}
|
||||
|
||||
void udfdPipeCloseCb(uv_handle_t *pipe) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(pipe);
|
||||
SUdfdUvConn *conn = pipe->data;
|
||||
SUvUdfWork *pWork = conn->pWorkList;
|
||||
while (pWork != NULL) {
|
||||
|
@ -1363,6 +1434,7 @@ void udfdPipeCloseCb(uv_handle_t *pipe) {
|
|||
}
|
||||
|
||||
void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(client, buf);
|
||||
fnDebug("udfd read %zd bytes from client", nread);
|
||||
if (nread == 0) return;
|
||||
|
||||
|
@ -1389,6 +1461,7 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
|||
}
|
||||
|
||||
void udfdOnNewConnection(uv_stream_t *server, int status) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(server);
|
||||
if (status < 0) {
|
||||
fnError("udfd new connection error. code: %s", uv_strerror(status));
|
||||
return;
|
||||
|
@ -1434,6 +1507,7 @@ _exit:
|
|||
}
|
||||
|
||||
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(handle);
|
||||
fnInfo("udfd signal received: %d\n", signum);
|
||||
uv_fs_t req;
|
||||
int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
|
||||
|
@ -1482,6 +1556,7 @@ static int32_t udfdInitLog() {
|
|||
}
|
||||
|
||||
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(buf);
|
||||
buf->base = taosMemoryMalloc(suggested_size);
|
||||
if (buf->base == NULL) {
|
||||
fnError("udfd ctrl pipe alloc buffer failed");
|
||||
|
@ -1491,6 +1566,7 @@ void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *bu
|
|||
}
|
||||
|
||||
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
|
||||
TAOS_UDF_CHECK_PTR_RVOID(q, buf);
|
||||
if (nread < 0) {
|
||||
fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
|
||||
taosMemoryFree(buf->base);
|
||||
|
|
Loading…
Reference in New Issue