From 1c01bd801bc830cd92b064c3b4e5a4bea26ddcbd Mon Sep 17 00:00:00 2001 From: xsren <285808407@qq.com> Date: Mon, 11 Nov 2024 09:23:02 +0800 Subject: [PATCH] param valid: udf --- include/libs/function/tudf.h | 22 +++++++++ source/libs/function/src/tudf.c | 31 ++++++++++++- source/libs/function/src/udfd.c | 82 +++++++++++++++++++++++++++++++-- 3 files changed, 131 insertions(+), 4 deletions(-) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index acdbc09be6..4a82a935d8 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -61,6 +61,28 @@ extern "C" { } \ } while (0) +#define TAOS_UDF_CHECK_PTR_RCODE(...) \ + do { \ + const void *ptrs[] = {__VA_ARGS__}; \ + for (int i = 0; i < sizeof(ptrs) / sizeof(ptrs[0]); ++i) { \ + if (ptrs[i] == NULL) { \ + fnError("udfd %dth parameter invalid, NULL PTR.line:%d", i, __LINE__); \ + return TSDB_CODE_INVALID_PARA; \ + } \ + } \ + } while (0) + +#define TAOS_UDF_CHECK_PTR_RVOID(...) \ + do { \ + const void *ptrs[] = {__VA_ARGS__}; \ + for (int i = 0; i < sizeof(ptrs) / sizeof(ptrs[0]); ++i) { \ + if (ptrs[i] == NULL) { \ + fnError("udfd %dth parameter invalid, NULL PTR.line:%d", i, __LINE__); \ + return; \ + } \ + } \ + } while (0) + // low level APIs /** diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 0e5b3ddbdb..67f9bcae0c 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -62,6 +62,7 @@ static void udfUdfdStopAsyncCb(uv_async_t *async); static void udfWatchUdfd(void *args); void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) { + TAOS_UDF_CHECK_PTR_RVOID(process); fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); SUdfdData *pData = process->data; if(pData == NULL) { @@ -81,6 +82,7 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) static int32_t udfSpawnUdfd(SUdfdData *pData) { fnInfo("start to init udfd"); + TAOS_UDF_CHECK_PTR_RCODE(pData); int32_t err = 0; uv_process_options_t options = {0}; @@ -271,17 +273,20 @@ _OVER: } static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) { + TAOS_UDF_CHECK_PTR_RVOID(handle); if (!uv_is_closing(handle)) { uv_close(handle, NULL); } } static void udfUdfdStopAsyncCb(uv_async_t *async) { + TAOS_UDF_CHECK_PTR_RVOID(async); SUdfdData *pData = async->data; uv_stop(&pData->loop); } static void udfWatchUdfd(void *args) { + TAOS_UDF_CHECK_PTR_RVOID(args); SUdfdData *pData = args; TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop)); TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb)); @@ -877,6 +882,7 @@ void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) { } void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) { + TAOS_UDF_CHECK_PTR_RVOID(data, meta); if (IS_VAR_DATA_TYPE(meta->type)) { taosMemoryFree(data->varLenCol.varOffsets); data->varLenCol.varOffsets = NULL; @@ -890,9 +896,13 @@ void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) { } } -void freeUdfColumn(SUdfColumn *col) { freeUdfColumnData(&col->colData, &col->colMeta); } +void freeUdfColumn(SUdfColumn *col) { + TAOS_UDF_CHECK_PTR_RVOID(col); + freeUdfColumnData(&col->colData, &col->colMeta); +} void freeUdfDataDataBlock(SUdfDataBlock *block) { + TAOS_UDF_CHECK_PTR_RVOID(block); for (int32_t i = 0; i < block->numOfCols; ++i) { freeUdfColumn(block->udfCols[i]); taosMemoryFree(block->udfCols[i]); @@ -903,11 +913,17 @@ void freeUdfDataDataBlock(SUdfDataBlock *block) { } void freeUdfInterBuf(SUdfInterBuf *buf) { + TAOS_UDF_CHECK_PTR_RVOID(buf); taosMemoryFree(buf->buf); buf->buf = NULL; } int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) { + TAOS_UDF_CHECK_PTR_RCODE(block, udfBlock); + int32_t code = blockDataCheck(block); + if (code != TSDB_CODE_SUCCESS) { + return code; + } udfBlock->numOfRows = block->info.rows; udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock); udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *)); @@ -977,6 +993,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo } int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { + TAOS_UDF_CHECK_PTR_RCODE(udfCol, block); int32_t code = 0, lino = 0; SUdfColumnMeta *meta = &udfCol->colMeta; @@ -1010,6 +1027,7 @@ _exit: } int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) { + TAOS_UDF_CHECK_PTR_RCODE(input, output); int32_t code = 0, lino = 0; int32_t numOfRows = 0; for (int32_t i = 0; i < numOfCols; ++i) { @@ -1057,6 +1075,7 @@ _exit: } int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { + TAOS_UDF_CHECK_PTR_RCODE(input, output); if (taosArrayGetSize(input->pDataBlock) != 1) { fnError("scalar function only support one column"); return 0; @@ -1135,6 +1154,7 @@ int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) { } int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { + TAOS_UDF_CHECK_PTR_RCODE(udfName, pHandle); int32_t code = 0, line = 0; uv_mutex_lock(&gUdfcProxy.udfStubsMutex); SUdfcFuncStub key = {0}; @@ -1193,6 +1213,7 @@ _exit: } void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { + TAOS_UDF_CHECK_PTR_RVOID(udfName); uv_mutex_lock(&gUdfcProxy.udfStubsMutex); SUdfcFuncStub key = {0}; tstrncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); @@ -1295,6 +1316,7 @@ int32_t cleanUpUdfs() { } int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { + TAOS_UDF_CHECK_PTR_RCODE(udfName, input, output); UdfcFuncHandle handle = NULL; int32_t code = acquireUdfFuncHandle(udfName, &handle); if (code != 0) { @@ -1324,6 +1346,10 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, } bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) { + if (pFunc == NULL || pEnv == NULL) { + fnError("udfAggGetEnv: invalid input lint: %d", __LINE__); + return false; + } if (fmIsScalarFunc(pFunc->funcId)) { return false; } @@ -1332,6 +1358,7 @@ bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) { } int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) { + TAOS_UDF_CHECK_PTR_RCODE(pCtx, pResultCellInfo); if (pResultCellInfo->initialized) { return TSDB_CODE_SUCCESS; } @@ -1373,6 +1400,7 @@ int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes } int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { + TAOS_UDF_CHECK_PTR_RCODE(pCtx); int32_t udfCode = 0; UdfcFuncHandle handle = 0; if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { @@ -1444,6 +1472,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { } int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { + TAOS_UDF_CHECK_PTR_RCODE(pCtx, pBlock); int32_t udfCode = 0; UdfcFuncHandle handle = 0; if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index e3d533186d..86573b4fba 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -55,6 +55,7 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } int32_t udfdCPluginClose() { return 0; } int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { + TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName); char initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *initSuffix = "_init"; snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix); @@ -68,6 +69,7 @@ int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const cha } int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { + TAOS_UDF_CHECK_PTR_RCODE(udfCtx, udfName); char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc))); @@ -93,6 +95,7 @@ int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfNa } int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { + TAOS_UDF_CHECK_PTR_RCODE(udf, pUdfCtx); int32_t err = 0; SUdfCPluginCtx *udfCtx = taosMemoryCalloc(1, sizeof(SUdfCPluginCtx)); if (NULL == udfCtx) { @@ -146,6 +149,7 @@ _exit: } int32_t udfdCPluginUdfDestroy(void *udfCtx) { + TAOS_UDF_CHECK_PTR_RCODE(udfCtx); SUdfCPluginCtx *ctx = udfCtx; int32_t code = 0; if (ctx->destroyFunc) { @@ -157,6 +161,7 @@ int32_t udfdCPluginUdfDestroy(void *udfCtx) { } int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, void *udfCtx) { + TAOS_UDF_CHECK_PTR_RCODE(block, resultCol, udfCtx); SUdfCPluginCtx *ctx = udfCtx; if (ctx->scalarProcFunc) { return ctx->scalarProcFunc(block, resultCol); @@ -167,6 +172,7 @@ int32_t udfdCPluginUdfScalarProc(SUdfDataBlock *block, SUdfColumn *resultCol, vo } int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) { + TAOS_UDF_CHECK_PTR_RCODE(buf, udfCtx); SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggStartFunc) { return ctx->aggStartFunc(buf); @@ -178,6 +184,7 @@ int32_t udfdCPluginUdfAggStart(SUdfInterBuf *buf, void *udfCtx) { } int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf, void *udfCtx) { + TAOS_UDF_CHECK_PTR_RCODE(block, interBuf, newInterBuf, udfCtx); SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggProcFunc) { return ctx->aggProcFunc(block, interBuf, newInterBuf); @@ -189,6 +196,7 @@ int32_t udfdCPluginUdfAggProc(SUdfDataBlock *block, SUdfInterBuf *interBuf, SUdf int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, SUdfInterBuf *outputBuf, void *udfCtx) { + TAOS_UDF_CHECK_PTR_RCODE(inputBuf1, inputBuf2, outputBuf, udfCtx); SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggMergeFunc) { return ctx->aggMergeFunc(inputBuf1, inputBuf2, outputBuf); @@ -199,6 +207,7 @@ int32_t udfdCPluginUdfAggMerge(SUdfInterBuf *inputBuf1, SUdfInterBuf *inputBuf2, } int32_t udfdCPluginUdfAggFinish(SUdfInterBuf *buf, SUdfInterBuf *resultData, void *udfCtx) { + TAOS_UDF_CHECK_PTR_RCODE(buf, resultData, udfCtx); SUdfCPluginCtx *ctx = udfCtx; if (ctx->aggFinishFunc) { return ctx->aggFinishFunc(buf, resultData); @@ -360,6 +369,7 @@ int32_t udfdNewUdf(SUdf **pUdf, const char *udfName); void udfdGetFuncBodyPath(const SUdf *udf, char *path); int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { + TAOS_UDF_CHECK_PTR_RCODE(plugin); plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; @@ -378,6 +388,7 @@ int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { } int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) { + TAOS_UDF_CHECK_PTR_RCODE(libPath, pLib, funcName, func); int err = uv_dlopen(libPath, pLib); if (err != 0) { fnError("can not load library %s. error: %s", libPath, uv_strerror(err)); @@ -394,6 +405,7 @@ int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], } int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { + TAOS_UDF_CHECK_PTR_RCODE(plugin); plugin->scriptType = TSDB_FUNC_SCRIPT_PYTHON; // todo: windows support snprintf(plugin->libPath, PATH_MAX, "%s", "libtaospyudf.so"); @@ -439,6 +451,10 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { } void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { + if (plugin == NULL) { + fnError("udf script c plugin is NULL"); + return; + } if (plugin->closeFunc) { if (plugin->closeFunc() != 0) { fnError("udf script c plugin close func failed.line:%d", __LINE__); @@ -457,8 +473,12 @@ void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { } void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { + if (plugin == NULL) { + fnError("udf script c plugin is NULL"); + return; + } if (plugin->closeFunc) { - if(plugin->closeFunc() != 0) { + if (plugin->closeFunc() != 0) { fnError("udf script python plugin close func failed.line:%d", __LINE__); } } @@ -528,7 +548,15 @@ void udfdDeinitScriptPlugins() { } void udfdProcessRequest(uv_work_t *req) { + if (req == NULL) { + fnError("udf request is NULL"); + return; + } SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); + if (uvUdf == NULL) { + fnError("udf work is NULL"); + return; + } SUdfRequest request = {0}; if(decodeUdfRequest(uvUdf->input.base, &request) == NULL) { @@ -557,7 +585,7 @@ void udfdProcessRequest(uv_work_t *req) { } } -void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { +static void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { udfInfo->bufSize = udf->bufSize; if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { udfInfo->funcType = UDF_FUNC_TYPE_AGG; @@ -573,7 +601,8 @@ void convertUdf2UdfInfo(SUdf *udf, SScriptUdfInfo *udfInfo) { udfInfo->scriptType = udf->scriptType; } -int32_t udfdInitUdf(char *udfName, SUdf *udf) { +static int32_t udfdInitUdf(char *udfName, SUdf *udf) { + TAOS_UDF_CHECK_PTR_RCODE(udfName, udf); int32_t err = 0; err = udfdFillUdfInfoFromMNode(global.clientRpc, udfName, udf); if (err != 0) { @@ -611,6 +640,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { } int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) { + TAOS_UDF_CHECK_PTR_RCODE(pUdf, udfName); SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); if (NULL == udfNew) { return terrno; @@ -654,6 +684,7 @@ void udfdFreeUdf(void *pData) { } int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) { + TAOS_UDF_CHECK_PTR_RCODE(ppUdf, udfName); uv_mutex_lock(&global.udfsMutex); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); int64_t currTime = taosGetTimestampMs(); @@ -693,6 +724,7 @@ int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) { } void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request); // TODO: tracable id from client. connect, setup, call, teardown fnInfo("setup request. seq num: %" PRId64 ", udf name: %s", request->seqNum, request->setup.udfName); @@ -764,7 +796,30 @@ _send: return; } +static int32_t checkUDFScalaResult(SSDataBlock *block, SUdfColumn *output) { + if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) { + return TSDB_CODE_SUCCESS; + } + if (output->colData.numOfRows != block->info.rows) { + fnError("udf scala result num of rows %d not equal to input rows %" PRId64, output->colData.numOfRows, block->info.rows); + return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t checkUDFAggResult(SSDataBlock *block, SUdfInterBuf *output) { + if (tsSafetyCheckLevel == TSDB_SAFETY_CHECK_LEVELL_NEVER) { + return TSDB_CODE_SUCCESS; + } + if (output->numOfResult != 1 && output->numOfResult != 0) { + fnError("udf agg result num of rows %d not equal to 1", output->numOfResult); + return TSDB_CODE_UDF_FUNC_EXEC_FAILURE; + } + return TSDB_CODE_SUCCESS; +} + void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request); SUdfCallRequest *call = &request->call; fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64, call->callType, call->udfHandle, request->seqNum); @@ -787,6 +842,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { code = convertDataBlockToUdfDataBlock(&call->block, &input); if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); freeUdfDataDataBlock(&input); + if (code == TSDB_CODE_SUCCESS) code = checkUDFScalaResult(&call->block, &output); if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); } freeUdfColumn(&output); @@ -809,6 +865,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { if (outBuf.buf != NULL) { code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); freeUdfInterBuf(&call->interBuf); + if (code == TSDB_CODE_SUCCESS) code = checkUDFAggResult(&call->block, &outBuf); subRsp->resultBuf = outBuf; } else { code = terrno; @@ -905,6 +962,7 @@ _exit: } void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { + TAOS_UDF_CHECK_PTR_RVOID(uvUdf, request); SUdfTeardownRequest *teardown = &request->teardown; fnInfo("teardown. seq number: %" PRId64 ", handle:%" PRIx64, request->seqNum, teardown->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); @@ -964,6 +1022,7 @@ _send: } void udfdGetFuncBodyPath(const SUdf *udf, char *path) { + TAOS_UDF_CHECK_PTR_RVOID(udf, path); if (udf->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) { #ifdef WINDOWS snprintf(path, PATH_MAX, "%s%s_%d_%" PRIx64 ".dll", global.udfDataDir, udf->name, udf->version, udf->createdTime); @@ -987,6 +1046,7 @@ void udfdGetFuncBodyPath(const SUdf *udf, char *path) { } int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { + TAOS_UDF_CHECK_PTR_RCODE(pFuncInfo, udf); if (!osDataSpaceAvailable()) { terrno = TSDB_CODE_NO_DISKSPACE; fnError("udfd create shared library failed since %s", terrstr()); @@ -1022,6 +1082,7 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { + TAOS_UDF_CHECK_PTR_RVOID(parent, pMsg); SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->info.ahandle; if (pEpSet) { @@ -1093,6 +1154,7 @@ _return: } int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { + TAOS_UDF_CHECK_PTR_RCODE(clientRpc, udfName, udf); SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); @@ -1233,6 +1295,7 @@ void udfdCloseClientRpc() { } void udfdOnWrite(uv_write_t *req, int status) { + TAOS_UDF_CHECK_PTR_RVOID(req); SUvUdfWork *work = (SUvUdfWork *)req->data; if (status < 0) { fnError("udfd send response error, length: %zu code: %s", work->output.len, uv_err_name(status)); @@ -1254,6 +1317,7 @@ void udfdOnWrite(uv_write_t *req, int status) { } void udfdSendResponse(uv_work_t *work, int status) { + TAOS_UDF_CHECK_PTR_RVOID(work); SUvUdfWork *udfWork = (SUvUdfWork *)(work->data); if (udfWork->conn != NULL) { @@ -1274,6 +1338,7 @@ void udfdSendResponse(uv_work_t *work, int status) { } void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { + TAOS_UDF_CHECK_PTR_RVOID(handle, buf); SUdfdUvConn *ctx = handle->data; int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t); if (ctx->inputCap == 0) { @@ -1307,6 +1372,10 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { } bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { + if (pipe == NULL) { + fnError("udfd pipe is NULL, LINE:%d", __LINE__); + return false; + } if (pipe->inputTotal == -1 && pipe->inputLen >= sizeof(int32_t)) { pipe->inputTotal = *(int32_t *)(pipe->inputBuf); } @@ -1318,6 +1387,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { } void udfdHandleRequest(SUdfdUvConn *conn) { + TAOS_UDF_CHECK_PTR_RVOID(conn); char *inputBuf = conn->inputBuf; int32_t inputLen = conn->inputLen; @@ -1350,6 +1420,7 @@ void udfdHandleRequest(SUdfdUvConn *conn) { } void udfdPipeCloseCb(uv_handle_t *pipe) { + TAOS_UDF_CHECK_PTR_RVOID(pipe); SUdfdUvConn *conn = pipe->data; SUvUdfWork *pWork = conn->pWorkList; while (pWork != NULL) { @@ -1363,6 +1434,7 @@ void udfdPipeCloseCb(uv_handle_t *pipe) { } void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { + TAOS_UDF_CHECK_PTR_RVOID(client, buf); fnDebug("udfd read %zd bytes from client", nread); if (nread == 0) return; @@ -1389,6 +1461,7 @@ void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { } void udfdOnNewConnection(uv_stream_t *server, int status) { + TAOS_UDF_CHECK_PTR_RVOID(server); if (status < 0) { fnError("udfd new connection error. code: %s", uv_strerror(status)); return; @@ -1434,6 +1507,7 @@ _exit: } void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { + TAOS_UDF_CHECK_PTR_RVOID(handle); fnInfo("udfd signal received: %d\n", signum); uv_fs_t req; int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); @@ -1482,6 +1556,7 @@ static int32_t udfdInitLog() { } void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + TAOS_UDF_CHECK_PTR_RVOID(buf); buf->base = taosMemoryMalloc(suggested_size); if (buf->base == NULL) { fnError("udfd ctrl pipe alloc buffer failed"); @@ -1491,6 +1566,7 @@ void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *bu } void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { + TAOS_UDF_CHECK_PTR_RVOID(q, buf); if (nread < 0) { fnError("udfd ctrl pipe read error. %s", uv_err_name(nread)); taosMemoryFree(buf->base);