diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 79abbc4e68..04b92a897a 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -33,6 +33,15 @@ extern "C" { #else #define FORCE_INLINE #endif + +#define TAOS_UDF_CHECK_RETURN(CMD) \ + do { \ + int32_t code = (CMD); \ + if (code != TSDB_CODE_SUCCESS) { \ + return (CMD); \ + } \ + } while (0) + typedef struct SUdfColumnMeta { int16_t type; int32_t bytes; @@ -192,25 +201,28 @@ static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn *pColumn, int32_t ne return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void udfColDataSetNull(SUdfColumn *pColumn, int32_t row) { - udfColEnsureCapacity(pColumn, row + 1); +static FORCE_INLINE int32_t udfColDataSetNull(SUdfColumn *pColumn, int32_t row) { + int32_t code = udfColEnsureCapacity(pColumn, row + 1); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (IS_VAR_DATA_TYPE(pColumn->colMeta.type)) { udfColDataSetNull_var(pColumn, row); } else { udfColDataSetNull_f(pColumn, row); } pColumn->hasNull = true; - pColumn->colData.numOfRows = - ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows; + pColumn->colData.numOfRows = ((int32_t)(row + 1) > pColumn->colData.numOfRows) ? (int32_t)(row + 1) : pColumn->colData.numOfRows; + return 0; } static FORCE_INLINE int32_t udfColDataSet(SUdfColumn *pColumn, uint32_t currentRow, const char *pData, bool isNull) { SUdfColumnMeta *meta = &pColumn->colMeta; SUdfColumnData *data = &pColumn->colData; - udfColEnsureCapacity(pColumn, currentRow + 1); + TAOS_UDF_CHECK_RETURN(udfColEnsureCapacity(pColumn, currentRow + 1)); bool isVarCol = IS_VAR_DATA_TYPE(meta->type); if (isNull) { - udfColDataSetNull(pColumn, currentRow); + TAOS_UDF_CHECK_RETURN(udfColDataSetNull(pColumn, currentRow)); } else { if (!isVarCol) { udfColDataSetNotNull_f(pColumn, currentRow); diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 7a8927ca90..acdbc09be6 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -43,6 +43,25 @@ extern "C" { #endif #define UDF_DNODE_ID_ENV_NAME "DNODE_ID" +#define TAOS_UV_LIB_ERROR_RET(ret) \ + do { \ + if (0 != ret) { \ + terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE; \ + return TSDB_CODE_UDF_UV_EXEC_FAILURE; \ + } \ + } while(0) + + +#define TAOS_UV_CHECK_ERRNO(CODE) \ + do { \ + if (0 != CODE) { \ + terrln = __LINE__; \ + terrno = (CODE); \ + goto _exit; \ + } \ + } while (0) + + // low level APIs /** * setup udf @@ -109,13 +128,13 @@ int32_t udfStartUdfd(int32_t startDnodeId); * stop udfd * @return */ -int32_t udfStopUdfd(); +void udfStopUdfd(); /** * get udfd pid * */ - int32_t udfGetUdfdPid(int32_t* pUdfdPid); +// int32_t udfGetUdfdPid(int32_t* pUdfdPid); #ifdef __cplusplus } diff --git a/include/os/osDir.h b/include/os/osDir.h index 533ac8e4a4..e660ac5853 100644 --- a/include/os/osDir.h +++ b/include/os/osDir.h @@ -79,6 +79,8 @@ extern "C" { typedef struct TdDir *TdDirPtr; typedef struct TdDirEntry *TdDirEntryPtr; +#define TAOS_DIRNAME(name) ((void)taosDirName(name)) + void taosRemoveDir(const char *dirname); bool taosDirExist(const char *dirname); int32_t taosMkDir(const char *dirname); diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 7a1df2b81c..5a76be1d1e 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -39,7 +39,7 @@ int64_t taosGetOsUptime(); int32_t taosGetEmail(char *email, int32_t maxLen); int32_t taosGetOsReleaseName(char *releaseName, char* sName, char* ver, int32_t maxLen); int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores); -int32_t taosGetCpuCores(float *numOfCores, bool physical); +void taosGetCpuCores(float *numOfCores, bool physical); void taosGetCpuUsage(double *cpu_system, double *cpu_engine); int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, char* avx512); int32_t taosGetTotalMemory(int64_t *totalKB); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9b49c1908d..6ad643f8d5 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -877,6 +877,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_UDF_INVALID_OUTPUT_TYPE TAOS_DEF_ERROR_CODE(0, 0x2908) #define TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED TAOS_DEF_ERROR_CODE(0, 0x2909) #define TSDB_CODE_UDF_FUNC_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290A) +#define TSDB_CODE_UDF_UV_EXEC_FAILURE TAOS_DEF_ERROR_CODE(0, 0x290B) // sml #define TSDB_CODE_SML_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x3000) diff --git a/include/util/tutil.h b/include/util/tutil.h index 72c4f90fd5..ca75461108 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -16,12 +16,10 @@ #ifndef _TD_UTIL_UTIL_H_ #define _TD_UTIL_UTIL_H_ -#include "os.h" #include "tcrc32c.h" #include "tdef.h" #include "thash.h" #include "tmd5.h" -#include "tutil.h" #ifdef __cplusplus extern "C" { diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 27d3b7930f..3dfe8dcab8 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -109,7 +109,7 @@ void freeUdfDataDataBlock(SUdfDataBlock *block); int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); -int32_t getUdfdPipeName(char *pipeName, int32_t size); +void getUdfdPipeName(char *pipeName, int32_t size); #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index c9b8a1e08b..ebb93695fd 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -24,6 +24,7 @@ #include "tdatablock.h" #include "tglobal.h" #include "tudf.h" +#include #include "tudfInt.h" #ifdef _TD_DARWIN_64 @@ -51,11 +52,10 @@ typedef struct SUdfdData { SUdfdData udfdGlobal = {0}; int32_t udfStartUdfd(int32_t startDnodeId); -int32_t udfStopUdfd(); +void udfStopUdfd(); static int32_t udfSpawnUdfd(SUdfdData *pData); void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal); -static int32_t udfSpawnUdfd(SUdfdData *pData); static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg); static void udfUdfdStopAsyncCb(uv_async_t *async); static void udfWatchUdfd(void *args); @@ -67,7 +67,10 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop"); } else { fnInfo("udfd process restart"); - udfSpawnUdfd(pData); + int32_t code = udfSpawnUdfd(pData); + if(code != 0) { + fnError("udfd process restart failed with code:%d", code); + } } } @@ -80,26 +83,26 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { path[0] = '.'; #ifdef WINDOWS GetModuleFileName(NULL, path, PATH_MAX); - taosDirName(path); + TAOS_DIRNAME(path); #elif defined(_TD_DARWIN_64) uint32_t pathSize = sizeof(path); _NSGetExecutablePath(path, &pathSize); - taosDirName(path); + TAOS_DIRNAME(path); #endif } else { - strncpy(path, tsProcPath, PATH_MAX); - taosDirName(path); + TAOS_STRNCPY(path, tsProcPath, PATH_MAX); + TAOS_DIRNAME(path); } #ifdef WINDOWS if (strlen(path) == 0) { - strcat(path, "C:\\TDengine"); + TAOS_STRCAT(path, "C:\\TDengine"); } - strcat(path, "\\udfd.exe"); + TAOS_STRCAT(path, "\\udfd.exe"); #else if (strlen(path) == 0) { - strcat(path, "/usr/bin"); + TAOS_STRCAT(path, "/usr/bin"); } - strcat(path, "/udfd"); + TAOS_STRCAT(path, "/udfd"); #endif char *argsUdfd[] = {path, "-c", configDir, NULL}; options.args = argsUdfd; @@ -107,7 +110,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { options.exit_cb = udfUdfdExit; - uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); + TAOS_UV_LIB_ERROR_RET(uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1)); uv_stdio_container_t child_stdio[3]; child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; @@ -156,7 +159,7 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { taosFqdnEnvItem = taosMemoryMalloc(strlen("TAOS_FQDN=") + strlen(taosFqdn) + 1); if (taosFqdnEnvItem != NULL) { strcpy(taosFqdnEnvItem, "TAOS_FQDN="); - strcat(taosFqdnEnvItem, taosFqdn); + TAOS_STRCAT(taosFqdnEnvItem, taosFqdn); fnInfo("[UDFD]Succsess to set TAOS_FQDN:%s", taosFqdn); } else { fnError("[UDFD]Failed to allocate memory for TAOS_FQDN"); @@ -212,22 +215,37 @@ static void udfUdfdStopAsyncCb(uv_async_t *async) { static void udfWatchUdfd(void *args) { SUdfdData *pData = args; - uv_loop_init(&pData->loop); - uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb); + TAOS_UV_CHECK_ERRNO(uv_loop_init(&pData->loop)); + TAOS_UV_CHECK_ERRNO(uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb)); pData->stopAsync.data = pData; - int32_t err = udfSpawnUdfd(pData); - atomic_store_32(&pData->spawnErr, err); - uv_barrier_wait(&pData->barrier); - uv_run(&pData->loop, UV_RUN_DEFAULT); - uv_loop_close(&pData->loop); + TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData)); + atomic_store_32(&pData->spawnErr, 0); + (void)uv_barrier_wait(&pData->barrier); + int num = uv_run(&pData->loop, UV_RUN_DEFAULT); + fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__); uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL); - uv_run(&pData->loop, UV_RUN_DEFAULT); - uv_loop_close(&pData->loop); + num = uv_run(&pData->loop, UV_RUN_DEFAULT); + fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__); + if(uv_loop_close(&pData->loop) != 0) { + fnError("udfd loop close failed, lino:%d", __LINE__); + } + +_exit: + if (terrno != 0) { + (void)uv_barrier_wait(&pData->barrier); + atomic_store_32(&pData->spawnErr, terrno); + if(uv_loop_close(&pData->loop) != 0) { + fnError("udfd loop close failed, lino:%d", __LINE__); + } + fnError("udfd thread exit with code:%d lino:%d", terrno, terrln); + terrno = TSDB_CODE_UDF_UV_EXEC_FAILURE; + } return; } int32_t udfStartUdfd(int32_t startDnodeId) { + int32_t code = 0, lino = 0; if (!tsStartUdfd) { fnInfo("start udfd is disabled.") return 0; } @@ -239,43 +257,58 @@ int32_t udfStartUdfd(int32_t startDnodeId) { pData->startCalled = true; char dnodeId[8] = {0}; snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId); - uv_os_setenv("DNODE_ID", dnodeId); + TAOS_CHECK_GOTO(uv_os_setenv("DNODE_ID", dnodeId), &lino, _exit); pData->dnodeId = startDnodeId; - uv_barrier_init(&pData->barrier, 2); - uv_thread_create(&pData->thread, udfWatchUdfd, pData); - uv_barrier_wait(&pData->barrier); + TAOS_CHECK_GOTO(uv_barrier_init(&pData->barrier, 2), &lino, _exit); + TAOS_CHECK_GOTO(uv_thread_create(&pData->thread, udfWatchUdfd, pData), &lino, _exit); + (void)uv_barrier_wait(&pData->barrier); int32_t err = atomic_load_32(&pData->spawnErr); if (err != 0) { uv_barrier_destroy(&pData->barrier); - uv_async_send(&pData->stopAsync); - uv_thread_join(&pData->thread); + if(uv_async_send(&pData->stopAsync) != 0) { + fnError("start udfd: failed to send stop async"); + } + if(uv_thread_join(&pData->thread)!= 0) { + fnError("start udfd: failed to join udfd thread"); + } pData->needCleanUp = false; fnInfo("udfd is cleaned up after spawn err"); + TAOS_CHECK_GOTO(err, &lino, _exit); } else { pData->needCleanUp = true; } - return err; +_exit: + if (code != 0) { + fnError("udfd start failed with code:%d, lino:%d", code, lino); + } + return code; } -int32_t udfStopUdfd() { +void udfStopUdfd() { SUdfdData *pData = &udfdGlobal; fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr); if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) { - return 0; + return; } atomic_store_32(&pData->stopCalled, 1); pData->needCleanUp = false; uv_barrier_destroy(&pData->barrier); - uv_async_send(&pData->stopAsync); - uv_thread_join(&pData->thread); + if(uv_async_send(&pData->stopAsync) != 0) { + fnError("stop udfd: failed to send stop async"); + } + if(uv_thread_join(&pData->thread) != 0) { + fnError("stop udfd: failed to join udfd thread"); + } + #ifdef WINDOWS if (pData->jobHandle != NULL) CloseHandle(pData->jobHandle); #endif fnInfo("udfd is cleaned up"); - return 0; + return; } +/* int32_t udfGetUdfdPid(int32_t* pUdfdPid) { SUdfdData *pData = &udfdGlobal; if (pData->spawnErr) { @@ -287,6 +320,7 @@ int32_t udfGetUdfdPid(int32_t* pUdfdPid) { } return TSDB_CODE_SUCCESS; } +*/ //============================================================================================== /* Copyright (c) 2013, Ben Noordhuis @@ -439,8 +473,6 @@ typedef struct SClientUdfTask { SUdfcUvSession *session; - int32_t errCode; - union { struct { SUdfSetupRequest req; @@ -479,7 +511,7 @@ enum { UDFC_STATE_STOPPING, // stopping after udfcClose }; -int32_t getUdfdPipeName(char *pipeName, int32_t size); +void getUdfdPipeName(char *pipeName, int32_t size); int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup); void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request); int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state); @@ -507,7 +539,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output); int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output); -int32_t getUdfdPipeName(char *pipeName, int32_t size) { +void getUdfdPipeName(char *pipeName, int32_t size) { char dnodeId[8] = {0}; size_t dnodeIdSize = sizeof(dnodeId); int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize); @@ -522,7 +554,6 @@ int32_t getUdfdPipeName(char *pipeName, int32_t size) { snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); #endif fnInfo("get dnodeId:%s from env, pipe path:%s", dnodeId, pipeName); - return 0; } int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { @@ -712,16 +743,14 @@ void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownR int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) { int32_t len = 0; - if (buf == NULL) { - len += sizeof(rsp->msgLen); - } else { + len += sizeof(rsp->msgLen); + if (buf != NULL) { *(int32_t *)(*buf) = rsp->msgLen; *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen)); } - if (buf == NULL) { - len += sizeof(rsp->seqNum); - } else { + len += sizeof(rsp->seqNum); + if (buf != NULL) { *(int64_t *)(*buf) = rsp->seqNum; *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum)); } @@ -810,6 +839,9 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *)); for (int32_t i = 0; i < udfBlock->numOfCols; ++i) { udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn)); + if(udfBlock->udfCols[i] == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i); SUdfColumn *udfCol = udfBlock->udfCols[i]; udfCol->colMeta.type = col->info.type; @@ -821,9 +853,15 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) { udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); + if(udfCol->colData.varLenCol.varOffsets == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen); udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows); udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen); + if(udfCol->colData.varLenCol.payload == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } if (col->reassigned) { for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) { char* pColData = col->pData + col->varmeta.offset[row]; @@ -843,6 +881,9 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows); int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen; udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen); + if(udfCol->colData.fixLenCol.nullBitmap == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } char *bitmap = udfCol->colData.fixLenCol.nullBitmap; memcpy(bitmap, col->nullbitmap, bitmapLen); udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows); @@ -852,15 +893,18 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo memcpy(data, col->pData, dataLen); } } - return 0; + return TSDB_CODE_SUCCESS; } int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { + int32_t code = 0, lino = 0; SUdfColumnMeta* meta = &udfCol->colMeta; SColumnInfoData colInfoData = createColumnInfoData(meta->type, meta->bytes, 1); - blockDataAppendColInfo(block, &colInfoData); - blockDataEnsureCapacity(block, udfCol->colData.numOfRows); + code = blockDataAppendColInfo(block, &colInfoData); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = blockDataEnsureCapacity(block, udfCol->colData.numOfRows); + TAOS_CHECK_GOTO(code, &lino, _exit); SColumnInfoData *col = bdGetColumnInfoData(block, 0); for (int i = 0; i < udfCol->colData.numOfRows; ++i) { @@ -868,43 +912,20 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { colDataSetNULL(col, i); } else { char* data = udfColDataGetData(udfCol, i); - colDataSetVal(col, i, data, false); + code = colDataSetVal(col, i, data, false); + TAOS_CHECK_GOTO(code, &lino, _exit); } } block->info.rows = udfCol->colData.numOfRows; - return 0; -} - -int32_t convertUdfColumnToDataBlock2(SUdfColumn *udfCol, SSDataBlock *block) { - block->info.rows = udfCol->colData.numOfRows; - block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type); - - block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); - taosArrayPush(block->pDataBlock, &(SColumnInfoData){0}); - SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0); - SUdfColumnMeta *meta = &udfCol->colMeta; - col->info.precision = meta->precision; - col->info.bytes = meta->bytes; - col->info.scale = meta->scale; - col->info.type = meta->type; - col->hasNull = udfCol->hasNull; - SUdfColumnData *data = &udfCol->colData; - - if (!IS_VAR_DATA_TYPE(meta->type)) { - col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen); - memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen); - col->pData = taosMemoryMalloc(data->fixLenCol.dataLen); - memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen); - } else { - col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen); - memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen); - col->pData = taosMemoryMalloc(data->varLenCol.payloadLen); - memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen); +_exit: + if (code != 0) { + fnError("failed to convert udf column to data block, code:%d, line:%d", code, lino); } - return 0; + return TSDB_CODE_SUCCESS; } int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) { + int32_t code = 0, lino = 0; int32_t numOfRows = 0; for (int32_t i = 0; i < numOfCols; ++i) { numOfRows = (input[i].numOfRows > numOfRows) ? input[i].numOfRows : numOfRows; @@ -916,16 +937,16 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS SColumnInfoData d = {0}; d.info = pInfo->info; - blockDataAppendColInfo(output, &d); + TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit); } - blockDataEnsureCapacity(output, numOfRows); + TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit); for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDest = taosArrayGet(output->pDataBlock, i); SColumnInfoData* pColInfoData = input[i].columnData; - colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info); + TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit); if (input[i].numOfRows < numOfRows) { int32_t startRow = input[i].numOfRows; @@ -936,26 +957,31 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS } else { char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1); for (int j = 0; j < expandRows; ++j) { - colDataSetVal(pDest, startRow+j, src, false); + TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow+j, src, false), &lino, _exit); } - //colDataSetNItems(pColInfoData, startRow, data, expandRows); } } } output->info.rows = numOfRows; - - return 0; +_exit: + if (code != 0) { + fnError("failed to convert scalar param to data block, code:%d, line:%d", code, lino); + } + return code; } int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { if (taosArrayGetSize(input->pDataBlock) != 1) { fnError("scalar function only support one column"); - return -1; + return 0; } output->numOfRows = input->info.rows; output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData)); + if(output->columnData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData)); output->colAlloced = true; @@ -1024,7 +1050,7 @@ int compareUdfcFuncSub(const void *elem1, const void *elem2) { } int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { - int32_t code = 0; + int32_t code = 0, line = 0; uv_mutex_lock(&gUdfcProxy.udfStubsMutex); SUdfcFuncStub key = {0}; strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); @@ -1048,7 +1074,10 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { } else { fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex); - taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub); + if(taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub) == NULL) { + fnError("acquireUdfFuncHandle: failed to push udf stub to array"); + goto _exit; + } taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub); } } @@ -1060,12 +1089,16 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { stub.handle = *pHandle; ++stub.refCount; stub.createTime = taosGetTimestampUs(); - taosArrayPush(gUdfcProxy.udfStubs, &stub); + if(taosArrayPush(gUdfcProxy.udfStubs, &stub) == NULL) { + fnError("acquireUdfFuncHandle: failed to push udf stub to array"); + goto _exit; + } taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub); } else { *pHandle = NULL; } +_exit: uv_mutex_unlock(&gUdfcProxy.udfStubsMutex); return code; } @@ -1092,17 +1125,23 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { void cleanupExpiredUdfs() { int32_t i = 0; SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + if(expiredUdfStubs == NULL) { + fnError("cleanupExpiredUdfs: failed to init array"); + return; + } while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); if (stub->refCount == 0) { fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); - doTeardownUdf(stub->handle); + (void)doTeardownUdf(stub->handle); } else { fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, stub->refCount, stub->createTime, stub->handle); UdfcFuncHandle handle = stub->handle; if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - taosArrayPush(expiredUdfStubs, stub); + if(taosArrayPush(expiredUdfStubs, stub) == NULL) { + fnError("cleanupExpiredUdfs: failed to push udf stub to array"); + } } else { fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName, stub->refCount, stub->createTime); @@ -1121,16 +1160,18 @@ void cleanupNotExpiredUdfs() { SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i); if (stub->refCount == 0) { fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); - doTeardownUdf(stub->handle); + (void)doTeardownUdf(stub->handle); } else { fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, stub->refCount, stub->createTime, stub->handle); UdfcFuncHandle handle = stub->handle; if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - taosArrayPush(udfStubs, stub); + if (taosArrayPush(udfStubs, stub) == NULL) { + fnError("cleanupNotExpiredUdfs: failed to push udf stub to array"); + } } else { - fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", - stub->udfName, stub->refCount, stub->createTime); + fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", stub->udfName, + stub->refCount, stub->createTime); } } ++i; @@ -1259,7 +1300,11 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { pTempBlock->info.rows = pInput->totalRows; pTempBlock->info.id.uid = pInput->uid; for (int32_t i = 0; i < numOfCols; ++i) { - blockDataAppendColInfo(pTempBlock, pInput->pData[i]); + if ((udfCode = blockDataAppendColInfo(pTempBlock, pInput->pData[i])) != 0) { + fnError("udfAggProcess error. step blockDataAppendColInfo. udf code: %d", udfCode); + blockDataDestroy(pTempBlock); + return udfCode; + } } SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows); @@ -1358,12 +1403,16 @@ void onUdfcPipeClose(uv_handle_t *handle) { } int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) { + int32_t code = 0; fnDebug("udfc get uv task result. task: %p, uvTask: %p", task, uvTask); if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->rspBuf.base != NULL) { SUdfResponse rsp = {0}; void *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp); - task->errCode = rsp.code; + code = rsp.code; + if(code != 0) { + fnError("udfc get udf task result failure. code: %d", code); + } switch (task->type) { case UDF_TASK_SETUP: { @@ -1386,14 +1435,23 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode * // TODO: the call buffer is setup and freed by udf invocation taosMemoryFree(uvTask->rspBuf.base); } else { - task->errCode = uvTask->errCode; + code = uvTask->errCode; + if(code != 0) { + fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__); + } } } else if (uvTask->type == UV_TASK_CONNECT) { - task->errCode = uvTask->errCode; + code = uvTask->errCode; + if(code != 0) { + fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__); + } } else if (uvTask->type == UV_TASK_DISCONNECT) { - task->errCode = uvTask->errCode; + code = uvTask->errCode; + if(code != 0) { + fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__); + } } - return 0; + return code; } void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { @@ -1542,7 +1600,11 @@ void onUdfcPipeConnect(uv_connect_t *connect, int status) { } uvTask->errCode = status; - uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead); + int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead); + if(code != 0) { + fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code)); + uvTask->errCode = code; + } taosMemoryFree(connect); QUEUE_REMOVE(&uvTask->procTaskQueue); uv_sem_post(&uvTask->taskSem); @@ -1572,16 +1634,37 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT fnError("udfc create uv task, invalid task type : %d", task->type); } int32_t bufLen = encodeUdfRequest(NULL, &request); + if (bufLen <= 0) { + fnError("udfc create uv task, encode request failed. size: %d", bufLen); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } request.msgLen = bufLen; void *bufBegin = taosMemoryMalloc(bufLen); + if(bufBegin == NULL) { + fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen); + return TSDB_CODE_OUT_OF_MEMORY; + } void *buf = bufBegin; - encodeUdfRequest(&buf, &request); + if(encodeUdfRequest(&buf, &request) <= 0) + { + fnError("udfc create uv task, encode request failed. size: %d", bufLen); + taosMemoryFree(bufBegin); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } + uvTask->reqBuf = uv_buf_init(bufBegin, bufLen); uvTask->seqNum = request.seqNum; } else if (uvTaskType == UV_TASK_DISCONNECT) { uvTask->pipe = task->session->udfUvPipe; } - uv_sem_init(&uvTask->taskSem, 0); + if (uv_sem_init(&uvTask->taskSem, 0) != 0) + { + if (uvTaskType == UV_TASK_REQ_RSP) { + taosMemoryFree(uvTask->reqBuf.base); + } + fnError("udfc create uv task, init semaphore failed."); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } return 0; } @@ -1592,7 +1675,11 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { uv_mutex_lock(&udfc->taskQueueMutex); QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue); uv_mutex_unlock(&udfc->taskQueueMutex); - uv_async_send(&udfc->loopTaskAync); + int32_t code = uv_async_send(&udfc->loopTaskAync); + if (code != 0) { + fnError("udfc queue uv task to event loop failed. code: %s", uv_strerror(code)); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } uv_sem_wait(&uvTask->taskSem); fnInfo("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask); @@ -1608,10 +1695,23 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); - uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0); + if(pipe == NULL) { + fnError("udfc event loop start connect task malloc pipe failed."); + return TSDB_CODE_OUT_OF_MEMORY; + } + if (uv_pipe_init(&uvTask->udfc->uvLoop, pipe, 0) != 0) { + fnError("udfc event loop start connect task uv_pipe_init failed."); + taosMemoryFree(pipe); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } uvTask->pipe = pipe; SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn)); + if(conn == NULL) { + fnError("udfc event loop start connect task malloc conn failed."); + taosMemoryFree(pipe); + return TSDB_CODE_OUT_OF_MEMORY; + } conn->pipe = pipe; conn->readBuf.len = 0; conn->readBuf.cap = 0; @@ -1622,6 +1722,12 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { pipe->data = conn; uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); + if(connReq == NULL) { + fnError("udfc event loop start connect task malloc connReq failed."); + taosMemoryFree(pipe); + taosMemoryFree(conn); + return TSDB_CODE_OUT_OF_MEMORY; + } connReq->data = uvTask; uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect); code = 0; @@ -1633,6 +1739,10 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { code = TSDB_CODE_UDF_PIPE_NOT_EXIST; } else { uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); + if(write == NULL) { + fnError("udfc event loop start req_rsp task malloc write failed."); + return TSDB_CODE_OUT_OF_MEMORY; + } write->data = pipe->data; QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue; QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue); @@ -1726,27 +1836,41 @@ void udfStopAsyncCb(uv_async_t *async) { } void constructUdfService(void *argsThread) { + int32_t code = 0, lino = 0; SUdfcProxy *udfc = (SUdfcProxy *)argsThread; - uv_loop_init(&udfc->uvLoop); + code = uv_loop_init(&udfc->uvLoop); + TAOS_CHECK_GOTO(code, &lino, _exit); - uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb); + code = uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb); + TAOS_CHECK_GOTO(code, &lino, _exit); udfc->loopTaskAync.data = udfc; - uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb); + code = uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb); + TAOS_CHECK_GOTO(code, &lino, _exit); udfc->loopStopAsync.data = udfc; - uv_mutex_init(&udfc->taskQueueMutex); + code = uv_mutex_init(&udfc->taskQueueMutex); + TAOS_CHECK_GOTO(code, &lino, _exit); QUEUE_INIT(&udfc->taskQueue); QUEUE_INIT(&udfc->uvProcTaskQueue); - uv_barrier_wait(&udfc->initBarrier); + (void)uv_barrier_wait(&udfc->initBarrier); // TODO return value of uv_run - uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); - uv_loop_close(&udfc->uvLoop); + int num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); + fnInfo("udfc uv loop exit. active handle num: %d", num); + (void)uv_loop_close(&udfc->uvLoop); uv_walk(&udfc->uvLoop, udfUdfdCloseWalkCb, NULL); - uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); - uv_loop_close(&udfc->uvLoop); + num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); + fnInfo("udfc uv loop exit. active handle num: %d", num); + + (void)uv_loop_close(&udfc->uvLoop); +_exit: + if (code != 0) { + fnError("udfc construct error. code: %d, line: %d", code, lino); + } + fnInfo("udfc construct finished"); } int32_t udfcOpen() { + int32_t code = 0, lino = 0; int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1); if (old == 1) { return 0; @@ -1754,16 +1878,36 @@ int32_t udfcOpen() { SUdfcProxy *proxy = &gUdfcProxy; getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName)); proxy->udfcState = UDFC_STATE_STARTNG; - uv_barrier_init(&proxy->initBarrier, 2); - uv_thread_create(&proxy->loopThread, constructUdfService, proxy); + code = uv_barrier_init(&proxy->initBarrier, 2); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = uv_thread_create(&proxy->loopThread, constructUdfService, proxy); + TAOS_CHECK_GOTO(code, &lino, _exit); atomic_store_8(&proxy->udfcState, UDFC_STATE_READY); proxy->udfcState = UDFC_STATE_READY; - uv_barrier_wait(&proxy->initBarrier); - uv_mutex_init(&proxy->udfStubsMutex); + (void)uv_barrier_wait(&proxy->initBarrier); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = uv_mutex_init(&proxy->udfStubsMutex); + TAOS_CHECK_GOTO(code, &lino, _exit); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); + if(proxy->udfStubs == NULL) { + fnError("udfc init failed. udfStubs: %p", proxy->udfStubs); + return -1; + } proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); - uv_mutex_init(&proxy->udfcUvMutex); - fnInfo("udfc initialized") return 0; + if(proxy->expiredUdfStubs == NULL) { + taosArrayDestroy(proxy->udfStubs); + fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs); + return -1; + } + code = uv_mutex_init(&proxy->udfcUvMutex); + TAOS_CHECK_GOTO(code, &lino, _exit); +_exit: + if (code != 0) { + fnError("udfc open error. code: %d, line: %d", code, lino); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } + fnInfo("udfc initialized"); + return 0; } int32_t udfcClose() { @@ -1774,8 +1918,12 @@ int32_t udfcClose() { SUdfcProxy *udfc = &gUdfcProxy; udfc->udfcState = UDFC_STATE_STOPPING; - uv_async_send(&udfc->loopStopAsync); - uv_thread_join(&udfc->loopThread); + if(uv_async_send(&udfc->loopStopAsync) != 0) { + fnError("udfc close error to send stop async"); + } + if(uv_thread_join(&udfc->loopThread) != 0 ) { + fnError("udfc close errir to join loop thread"); + } uv_mutex_destroy(&udfc->taskQueueMutex); uv_barrier_destroy(&udfc->initBarrier); taosArrayDestroy(udfc->expiredUdfStubs); @@ -1788,45 +1936,61 @@ int32_t udfcClose() { } int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { + int32_t code = 0, lino = 0; SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); + if(uvTask == NULL) { + fnError("udfc client task: %p failed to allocate memory for uvTask", task); + return TSDB_CODE_OUT_OF_MEMORY; + } fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe); - udfcInitializeUvTask(task, uvTaskType, uvTask); - udfcQueueUvTask(uvTask); - udfcGetUdfTaskResultFromUvTask(task, uvTask); + code = udfcInitializeUvTask(task, uvTaskType, uvTask); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = udfcQueueUvTask(uvTask); + TAOS_CHECK_GOTO(code, &lino, _exit); + code = udfcGetUdfTaskResultFromUvTask(task, uvTask); + TAOS_CHECK_GOTO(code, &lino, _exit); if (uvTaskType == UV_TASK_CONNECT) { task->session->udfUvPipe = uvTask->pipe; SClientUvConn *conn = uvTask->pipe->data; conn->session = task->session; } + +_exit: + if (code != 0) { + fnError("udfc run udf uv task failure. task: %p, uvTask: %p, err: %d, line: %d", task, uvTask, code, lino); + } taosMemoryFree(uvTask->reqBuf.base); uvTask->reqBuf.base = NULL; taosMemoryFree(uvTask); - fnDebug("udfc freed uvTask: %p", task); - uvTask = NULL; - return task->errCode; + return code; } int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { + int32_t code = TSDB_CODE_SUCCESS, lino = 0; SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); - task->errCode = 0; + if(task == NULL) { + fnError("doSetupUdf, failed to allocate memory for task"); + return TSDB_CODE_OUT_OF_MEMORY; + } task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession)); + if(task->session == NULL) { + fnError("doSetupUdf, failed to allocate memory for session"); + taosMemoryFree(task); + return TSDB_CODE_OUT_OF_MEMORY; + } task->session->udfc = &gUdfcProxy; task->type = UDF_TASK_SETUP; SUdfSetupRequest *req = &task->_setup.req; strncpy(req->udfName, udfName, TSDB_FUNC_NAME_LEN); - int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT); - if (errCode != 0) { - fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfcProxy)->udfdPipeName); - taosMemoryFree(task->session); - taosMemoryFree(task); - return TSDB_CODE_UDF_PIPE_CONNECT_ERR; - } + code = udfcRunUdfUvTask(task, UV_TASK_CONNECT); + TAOS_CHECK_GOTO(code, &lino, _exit); - udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + TAOS_CHECK_GOTO(code, &lino, _exit); SUdfSetupResponse *rsp = &task->_setup.rsp; task->session->severHandle = rsp->udfHandle; @@ -1834,15 +1998,18 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { task->session->bytes = rsp->bytes; task->session->bufSize = rsp->bufSize; strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN); - if (task->errCode != 0) { - fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) - } else { - fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session); - *funcHandle = task->session; - } - int32_t err = task->errCode; + fnInfo("successfully setup udf func handle. udfName: %s, handle: %p", udfName, task->session); + *funcHandle = task->session; taosMemoryFree(task); - return err; + return 0; + +_exit: + if (code != 0) { + fnError("failed to setup udf. udfname: %s, err: %d line:%d", udfName, code, lino); + } + taosMemoryFree(task->session); + taosMemoryFree(task); + return code; } int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, @@ -1854,7 +2021,10 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf return TSDB_CODE_UDF_PIPE_NOT_EXIST; } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); - task->errCode = 0; + if(task == NULL) { + fnError("udfc call udf. failed to allocate memory for task"); + return TSDB_CODE_OUT_OF_MEMORY; + } task->session = (SUdfcUvSession *)handle; task->type = UDF_TASK_CALL; @@ -1887,10 +2057,9 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf } } - udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); - - if (task->errCode != 0) { - fnError("call udf failure. err: %d", task->errCode); + int32_t code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + if (code != 0) { + fnError("call udf failure. udfcRunUdfUvTask err: %d", code); } else { SUdfCallResponse *rsp = &task->_call.rsp; switch (callType) { @@ -1916,9 +2085,8 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf } } }; - int err = task->errCode; taosMemoryFree(task); - return err; + return code; } int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { @@ -1957,11 +2125,15 @@ int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdf int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { int8_t callType = TSDB_UDF_CALL_SCALA_PROC; SSDataBlock inputBlock = {0}; - convertScalarParamToDataBlock(input, numOfCols, &inputBlock); + int32_t code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock); + if(code != 0) { + fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code); + return code; + } SSDataBlock resultBlock = {0}; int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL); if (err == 0) { - convertDataBlockToScalarParm(&resultBlock, output); + err = convertDataBlockToScalarParm(&resultBlock, output); taosArrayDestroy(resultBlock.pDataBlock); } @@ -1970,6 +2142,7 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t } int32_t doTeardownUdf(UdfcFuncHandle handle) { + int32_t code = TSDB_CODE_SUCCESS, lino = 0;; SUdfcUvSession *session = (SUdfcUvSession *)handle; if (session->udfUvPipe == NULL) { @@ -1979,18 +2152,22 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); - task->errCode = 0; + if(task == NULL) { + fnError("doTeardownUdf, failed to allocate memory for task"); + taosMemoryFree(session); + return TSDB_CODE_OUT_OF_MEMORY; + } task->session = session; task->type = UDF_TASK_TEARDOWN; SUdfTeardownRequest *req = &task->_teardown.req; req->udfHandle = task->session->severHandle; - udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + code = udfcRunUdfUvTask(task, UV_TASK_REQ_RSP); + TAOS_CHECK_GOTO(code, &lino, _exit); - int32_t err = task->errCode; - - udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); + code = udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); + TAOS_CHECK_GOTO(code, &lino, _exit); fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); // TODO: synchronization refactor between libuv event loop and request thread @@ -2000,8 +2177,13 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { conn->session = NULL; } uv_mutex_unlock(&gUdfcProxy.udfcUvMutex); + +_exit: + if (code != 0) { + fnError("failed to teardown udf. udf name: %s, err: %d, line: %d", session->udfName, code, lino); + } taosMemoryFree(session); taosMemoryFree(task); - return err; + return code; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 75bed73bb3..7339f115a3 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -54,38 +54,39 @@ int32_t udfdCPluginOpen(SScriptUdfEnvItem *items, int numItems) { return 0; } int32_t udfdCPluginClose() { return 0; } -const char *udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { +int32_t udfdCPluginUdfInitLoadInitDestoryFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { char initFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; char *initSuffix = "_init"; snprintf(initFuncName, sizeof(initFuncName), "%s%s", udfName, initSuffix); - uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc)); + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, initFuncName, (void **)(&udfCtx->initFunc))); char destroyFuncName[TSDB_FUNC_NAME_LEN + 9] = {0}; char *destroySuffix = "_destroy"; snprintf(destroyFuncName, sizeof(destroyFuncName), "%s%s", udfName, destroySuffix); - uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc)); - return udfName; + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, destroyFuncName, (void **)(&udfCtx->destroyFunc))); + return 0; } -void udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { +int32_t udfdCPluginUdfInitLoadAggFuncs(SUdfCPluginCtx *udfCtx, const char *udfName) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; - snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); - uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc)); + snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->aggProcFunc))); char startFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char *startSuffix = "_start"; snprintf(startFuncName, sizeof(startFuncName), "%s%s", processFuncName, startSuffix); - uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc)); + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, startFuncName, (void **)(&udfCtx->aggStartFunc))); char finishFuncName[TSDB_FUNC_NAME_LEN + 8] = {0}; char *finishSuffix = "_finish"; snprintf(finishFuncName, sizeof(finishFuncName), "%s%s", processFuncName, finishSuffix); - uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc)); + TAOS_CHECK_RETURN(uv_dlsym(&udfCtx->lib, finishFuncName, (void **)(&udfCtx->aggFinishFunc))); char mergeFuncName[TSDB_FUNC_NAME_LEN + 7] = {0}; char *mergeSuffix = "_merge"; snprintf(mergeFuncName, sizeof(mergeFuncName), "%s%s", processFuncName, mergeSuffix); - uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc)); + (void)(uv_dlsym(&udfCtx->lib, mergeFuncName, (void **)(&udfCtx->aggMergeFunc))); + return 0; } int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { @@ -99,27 +100,43 @@ int32_t udfdCPluginUdfInit(SScriptUdfInfo *udf, void **pUdfCtx) { } const char *udfName = udf->name; - udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); + err = udfdCPluginUdfInitLoadInitDestoryFuncs(udfCtx, udfName); + if (err != 0) { + fnError("can not load init/destroy functions. error: %d", err); + err = TSDB_CODE_UDF_LOAD_UDF_FAILURE; + goto _exit; + } if (udf->funcType == UDF_FUNC_TYPE_SCALAR) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; snprintf(processFuncName, sizeof(processFuncName), "%s", udfName); - uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)); + if (uv_dlsym(&udfCtx->lib, processFuncName, (void **)(&udfCtx->scalarProcFunc)) != 0) { + fnError("can not load library function %s. error: %s", processFuncName, uv_strerror(err)); + err = TSDB_CODE_UDF_LOAD_UDF_FAILURE; + goto _exit; + } } else if (udf->funcType == UDF_FUNC_TYPE_AGG) { - udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName); + err = udfdCPluginUdfInitLoadAggFuncs(udfCtx, udfName); + if (err != 0) { + fnError("can not load aggregation functions. error: %d", err); + err = TSDB_CODE_UDF_LOAD_UDF_FAILURE; + goto _exit; + } } - int32_t code = 0; if (udfCtx->initFunc) { - code = (udfCtx->initFunc)(); - if (code != 0) { - uv_dlclose(&udfCtx->lib); - taosMemoryFree(udfCtx); - return code; + err = (udfCtx->initFunc)(); + if (err != 0) { + fnError("udf init function failed. error: %d", err); + goto _exit; } } *pUdfCtx = udfCtx; return 0; +_exit: + uv_dlclose(&udfCtx->lib); + taosMemoryFree(udfCtx); + return err; } int32_t udfdCPluginUdfDestroy(void *udfCtx) { @@ -303,7 +320,7 @@ static int32_t udfdConnectToMnode(); static bool udfdRpcRfp(int32_t code, tmsg_t msgType); static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t udfdOpenClientRpc(); -static int32_t udfdCloseClientRpc(); +static void udfdCloseClientRpc(); static void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request); static void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request); @@ -320,7 +337,7 @@ static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf static void udfdOnNewConnection(uv_stream_t *server, int status); static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); -static int32_t removeListeningPipe(); +static void removeListeningPipe(); static void udfdPrintVersion(); static int32_t udfdParseArgs(int32_t argc, char *argv[]); @@ -330,13 +347,13 @@ static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); static int32_t udfdUvInit(); static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); -static int32_t udfdRun(); +static void udfdRun(); static void udfdConnectMnodeThreadFunc(void *args); -SUdf *udfdNewUdf(const char *udfName); +int32_t udfdNewUdf(SUdf **pUdf, const char *udfName); void udfdGetFuncBodyPath(const SUdf *udf, char *path); -void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { +int32_t udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->scriptType = TSDB_FUNC_SCRIPT_BIN_LIB; plugin->openFunc = udfdCPluginOpen; plugin->closeFunc = udfdCPluginClose; @@ -349,8 +366,9 @@ void udfdInitializeCPlugin(SUdfScriptPlugin *plugin) { plugin->udfAggFinishFunc = udfdCPluginUdfAggFinish; SScriptUdfEnvItem items[1] = {{"LD_LIBRARY_PATH", tsUdfdLdLibPath}}; - plugin->openFunc(items, 1); - return; + int32_t err = plugin->openFunc(items, 1); + if (err != 0) return err; + return 0; } int32_t udfdLoadSharedLib(char *libPath, uv_lib_t *pLib, const char *funcName[], void **func[], int numOfFuncs) { @@ -412,7 +430,9 @@ int32_t udfdInitializePythonPlugin(SUdfScriptPlugin *plugin) { void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { if (plugin->closeFunc) { - plugin->closeFunc(); + if (plugin->closeFunc() != 0) { + fnError("udf script c plugin close func failed.line:%d", __LINE__); + } } plugin->openFunc = NULL; plugin->closeFunc = NULL; @@ -428,7 +448,9 @@ void udfdDeinitCPlugin(SUdfScriptPlugin *plugin) { void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { if (plugin->closeFunc) { - plugin->closeFunc(); + if(plugin->closeFunc() != 0) { + fnError("udf script python plugin close func failed.line:%d", __LINE__); + } } uv_dlclose(&plugin->lib); if (plugin->libLoaded) { @@ -447,14 +469,23 @@ void udfdDeinitPythonPlugin(SUdfScriptPlugin *plugin) { int32_t udfdInitScriptPlugin(int8_t scriptType) { SUdfScriptPlugin *plugin = taosMemoryCalloc(1, sizeof(SUdfScriptPlugin)); - + if (plugin == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t err = 0; switch (scriptType) { case TSDB_FUNC_SCRIPT_BIN_LIB: - udfdInitializeCPlugin(plugin); + err = udfdInitializeCPlugin(plugin); + if (err != 0) { + fnError("udf script c plugin init failed. error: %d", err); + taosMemoryFree(plugin); + return err; + } break; case TSDB_FUNC_SCRIPT_PYTHON: { - int32_t err = udfdInitializePythonPlugin(plugin); + err = udfdInitializePythonPlugin(plugin); if (err != 0) { + fnError("udf script python plugin init failed. error: %d", err); taosMemoryFree(plugin); return err; } @@ -489,7 +520,7 @@ void udfdDeinitScriptPlugins() { void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); SUdfRequest request = {0}; - decodeUdfRequest(uvUdf->input.base, &request); + if(decodeUdfRequest(uvUdf->input.base, &request) == NULL) return; switch (request.type) { case UDF_TASK_SETUP: { @@ -544,6 +575,7 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { if (scriptPlugin == NULL) { err = udfdInitScriptPlugin(udf->scriptType); if (err != 0) { + fnError("udf name %s init script plugin failed. error %d", udfName, err); uv_mutex_unlock(&global.scriptPluginsMutex); return err; } @@ -563,15 +595,15 @@ int32_t udfdInitUdf(char *udfName, SUdf *udf) { return 0; } -SUdf *udfdNewUdf(const char *udfName) { +int32_t udfdNewUdf(SUdf **pUdf, const char *udfName) { SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); udfNew->refCount = 1; udfNew->lastFetchTime = taosGetTimestampMs(); strncpy(udfNew->name, udfName, TSDB_FUNC_NAME_LEN); udfNew->state = UDF_STATE_INIT; - uv_mutex_init(&udfNew->lock); - uv_cond_init(&udfNew->condReady); + if (uv_mutex_init(&udfNew->lock) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE; + if (uv_cond_init(&udfNew->condReady) != 0) return TSDB_CODE_UDF_UV_EXEC_FAILURE; udfNew->resident = false; udfNew->expired = false; @@ -582,10 +614,28 @@ SUdf *udfdNewUdf(const char *udfName) { break; } } - return udfNew; + *pUdf = udfNew; + return 0; } -SUdf *udfdGetOrCreateUdf(const char *udfName) { +void udfdFreeUdf(void *pData) { + SUdf *pSudf = (SUdf *)pData; + if (pSudf == NULL) { + return; + } + + if (pSudf->scriptPlugin != NULL) { + if(pSudf->scriptPlugin->udfDestroyFunc(pSudf->scriptUdfCtx) != 0) { + fnError("udfdFreeUdf: udfd destroy udf %s failed", pSudf->name); + } + } + + uv_mutex_destroy(&pSudf->lock); + uv_cond_destroy(&pSudf->condReady); + taosMemoryFree(pSudf); +} + +int32_t udfdGetOrCreateUdf(SUdf **ppUdf, const char *udfName) { uv_mutex_lock(&global.udfsMutex); SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName)); int64_t currTime = taosGetTimestampMs(); @@ -594,26 +644,34 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) { expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s if (!expired) { ++(*pUdfHash)->refCount; - SUdf *udf = *pUdfHash; + *ppUdf = *pUdfHash; uv_mutex_unlock(&global.udfsMutex); - fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, udf->name, udf->version, - udf->createdTime); - return udf; + fnInfo("udfd reuse existing udf. udf %s udf version %d, udf created time %" PRIx64, (*ppUdf)->name, (*ppUdf)->version, + (*ppUdf)->createdTime); + return 0; } else { (*pUdfHash)->expired = true; fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64, (*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime); - taosHashRemove(global.udfsHash, udfName, strlen(udfName)); + if(taosHashRemove(global.udfsHash, udfName, strlen(udfName)) != 0) { + fnError("udfdGetOrCreateUdf: udfd remove udf %s failed", udfName); + } } } - SUdf *udf = udfdNewUdf(udfName); + int32_t code = udfdNewUdf(ppUdf, udfName); + if(code != 0) { + uv_mutex_unlock(&global.udfsMutex); + return code; + } - SUdf **pUdf = &udf; - taosHashPut(global.udfsHash, udfName, strlen(udfName), pUdf, POINTER_BYTES); + if ((code = taosHashPut(global.udfsHash, udfName, strlen(udfName), ppUdf, POINTER_BYTES)) != 0) { + uv_mutex_unlock(&global.udfsMutex); + return code; + } uv_mutex_unlock(&global.udfsMutex); - return udf; + return 0; } void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { @@ -622,10 +680,13 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfSetupRequest *setup = &request->setup; int32_t code = TSDB_CODE_SUCCESS; - SUdf *udf = NULL; - - udf = udfdGetOrCreateUdf(setup->udfName); + SUdf *udf = NULL; + code = udfdGetOrCreateUdf(&udf, setup->udfName); + if(code != 0) { + fnError("udfdGetOrCreateUdf failed. udf name %s", setup->udfName); + goto _send; + } uv_mutex_lock(&udf->lock); if (udf->state == UDF_STATE_INIT) { udf->state = UDF_STATE_LOADING; @@ -646,6 +707,8 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle)); handle->udf = udf; +_send: + ; SUdfResponse rsp; rsp.seqNum = request->seqNum; rsp.type = request->type; @@ -656,11 +719,23 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { rsp.setupRsp.bufSize = udf->bufSize; int32_t len = encodeUdfResponse(NULL, &rsp); + if(len < 0) { + fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len); + return; + } rsp.msgLen = len; void *bufBegin = taosMemoryMalloc(len); + if(bufBegin == NULL) { + fnError("udfdProcessSetupRequest: malloc failed. len %d", len); + return; + } void *buf = bufBegin; - encodeUdfResponse(&buf, &rsp); - + if(encodeUdfResponse(&buf, &rsp) < 0) { + fnError("udfdProcessSetupRequest: encode udf response failed. len %d", len); + taosMemoryFree(bufBegin); + return; + } + uvUdf->output = uv_buf_init(bufBegin, len); taosMemoryFree(uvUdf->input.base); @@ -685,30 +760,35 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { output.colMeta.type = udf->outputType; output.colMeta.precision = 0; output.colMeta.scale = 0; - udfColEnsureCapacity(&output, call->block.info.rows); - - SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); - freeUdfDataDataBlock(&input); - if(code == 0) convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); + if (udfColEnsureCapacity(&output, call->block.info.rows) == TSDB_CODE_SUCCESS) { + SUdfDataBlock input = {0}; + code = convertDataBlockToUdfDataBlock(&call->block, &input); + if (code == TSDB_CODE_SUCCESS) code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx); + freeUdfDataDataBlock(&input); + if (code == TSDB_CODE_SUCCESS) code = convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); + } freeUdfColumn(&output); break; } case TSDB_UDF_CALL_AGG_INIT: { SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx); + if (outBuf.buf != NULL) { + code = udf->scriptPlugin->udfAggStartFunc(&outBuf, udf->scriptUdfCtx); + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } subRsp->resultBuf = outBuf; break; } case TSDB_UDF_CALL_AGG_PROC: { SUdfDataBlock input = {0}; - convertDataBlockToUdfDataBlock(&call->block, &input); - SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; - code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); - freeUdfInterBuf(&call->interBuf); + if (convertDataBlockToUdfDataBlock(&call->block, &input) == TSDB_CODE_SUCCESS) { + SUdfInterBuf outBuf = {.buf = taosMemoryMalloc(udf->bufSize), .bufLen = udf->bufSize, .numOfResult = 0}; + code = udf->scriptPlugin->udfAggProcFunc(&input, &call->interBuf, &outBuf, udf->scriptUdfCtx); + freeUdfInterBuf(&call->interBuf); + subRsp->resultBuf = outBuf; + } freeUdfDataDataBlock(&input); - subRsp->resultBuf = outBuf; break; } @@ -738,10 +818,19 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { subRsp->callType = call->callType; int32_t len = encodeUdfResponse(NULL, rsp); + if(len < 0) { + fnError("udfdProcessCallRequest: encode udf response failed. len %d", len); + return; + } rsp->msgLen = len; void *bufBegin = taosMemoryMalloc(len); void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); + if(encodeUdfResponse(&buf, rsp) < 0) { + fnError("udfdProcessCallRequest: encode udf response failed. len %d", len); + taosMemoryFree(bufBegin); + return; + } + uvUdf->output = uv_buf_init(bufBegin, len); switch (call->callType) { @@ -787,7 +876,11 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { udf->refCount--; if (udf->refCount == 0 && (!udf->resident || udf->expired)) { unloadUdf = true; - taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); + code = taosHashRemove(global.udfsHash, udf->name, strlen(udf->name)); + if (code != 0) { + fnError("udf name %s remove from hash failed", udf->name); + goto _send; + } } uv_mutex_unlock(&global.udfsMutex); if (unloadUdf) { @@ -798,18 +891,27 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { fnDebug("udfd destroy function returns %d", code); taosMemoryFree(udf); } - taosMemoryFree(handle); +_send: + taosMemoryFree(handle); SUdfResponse response = {0}; SUdfResponse *rsp = &response; rsp->seqNum = request->seqNum; rsp->type = request->type; rsp->code = code; int32_t len = encodeUdfResponse(NULL, rsp); + if (len < 0) { + fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len); + return; + } rsp->msgLen = len; void *bufBegin = taosMemoryMalloc(len); void *buf = bufBegin; - encodeUdfResponse(&buf, rsp); + if (encodeUdfResponse(&buf, rsp) < 0) { + fnError("udfdProcessTeardownRequest: encode udf response failed. len %d", len); + taosMemoryFree(bufBegin); + return; + } uvUdf->output = uv_buf_init(bufBegin, len); taosMemoryFree(uvUdf->input.base); @@ -865,7 +967,10 @@ int32_t udfdSaveFuncBodyToFile(SFuncInfo *pFuncInfo, SUdf *udf) { fnError("udfd write udf shared library failed"); return TSDB_CODE_FILE_CORRUPTED; } - taosCloseFile(&file); + if(taosCloseFile(&file) != 0) { + fnError("udfdSaveFuncBodyToFile, udfd close file failed"); + return TSDB_CODE_FILE_CORRUPTED; + } strncpy(udf->path, path, PATH_MAX); return TSDB_CODE_SUCCESS; @@ -888,7 +993,10 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) { SConnectRsp connectRsp = {0}; - tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); + if(tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp) < 0){ + fnError("udfd deserialize connect response failed"); + goto _return; + } int32_t now = taosGetTimestampSec(); int32_t delta = abs(now - connectRsp.svrTimestamp); @@ -908,7 +1016,10 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { msgInfo->code = 0; } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { SRetrieveFuncRsp retrieveRsp = {0}; - tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); + if(tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp) < 0){ + fnError("udfd deserialize retrieve func response failed"); + goto _return; + } SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); SUdf *udf = msgInfo->param; @@ -940,28 +1051,43 @@ int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); - taosArrayPush(retrieveReq.pFuncNames, udfName); + if(taosArrayPush(retrieveReq.pFuncNames, udfName) == NULL) { + taosArrayDestroy(retrieveReq.pFuncNames); + return terrno; + } int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq); + if(contLen < 0) { + taosArrayDestroy(retrieveReq.pFuncNames); + return terrno; + } void *pReq = rpcMallocCont(contLen); - tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq); + if(tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq) < 0) { + taosArrayDestroy(retrieveReq.pFuncNames); + rpcFreeCont(pReq); + return terrno; + } taosArrayDestroy(retrieveReq.pFuncNames); SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo)); msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC; msgInfo->param = udf; - uv_sem_init(&msgInfo->resultSem, 0); + if(uv_sem_init(&msgInfo->resultSem, 0) != 0) { + taosMemoryFree(msgInfo); + return TSDB_CODE_UDF_UV_EXEC_FAILURE; + } SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pReq; rpcMsg.contLen = contLen; rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC; rpcMsg.info.ahandle = msgInfo; - rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); - - uv_sem_wait(&msgInfo->resultSem); - uv_sem_destroy(&msgInfo->resultSem); - int32_t code = msgInfo->code; + int32_t code = rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL); + if (code == 0) { + uv_sem_wait(&msgInfo->resultSem); + uv_sem_destroy(&msgInfo->resultSem); + code = msgInfo->code; + } taosMemoryFree(msgInfo); return code; } @@ -1009,8 +1135,12 @@ int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSe return -1; } - taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); - mgmtEpSet->numOfEps++; + int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]); + if (code != TSDB_CODE_SUCCESS) { + fnError("invalid ep %s", secondEp); + } else { + mgmtEpSet->numOfEps++; + } } if (mgmtEpSet->numOfEps == 0) { @@ -1039,7 +1169,7 @@ int32_t udfdOpenClientRpc() { connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; - taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + TAOS_CHECK_RETURN(taosVersionStrToInt(version, &(rpcInit.compatibilityVer))); global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); @@ -1048,11 +1178,10 @@ int32_t udfdOpenClientRpc() { return 0; } -int32_t udfdCloseClientRpc() { +void udfdCloseClientRpc() { fnInfo("udfd begin closing rpc"); rpcClose(global.clientRpc); fnInfo("udfd finish closing rpc"); - return 0; } void udfdOnWrite(uv_write_t *req, int status) { @@ -1082,7 +1211,11 @@ void udfdSendResponse(uv_work_t *work, int status) { if (udfWork->conn != NULL) { uv_write_t *write_req = taosMemoryMalloc(sizeof(uv_write_t)); write_req->data = udfWork; - uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite); + int32_t code = uv_write(write_req, udfWork->conn->client, &udfWork->output, 1, udfdOnWrite); + if (code != 0) { + fnError("udfd send response error %s", uv_strerror(code)); + taosMemoryFree(write_req); + } } taosMemoryFree(work); } @@ -1146,7 +1279,12 @@ void udfdHandleRequest(SUdfdUvConn *conn) { conn->inputCap = 0; conn->inputTotal = -1; work->data = udfWork; - uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse); + if(uv_queue_work(global.loop, work, udfdProcessRequest, udfdSendResponse) != 0) + { + fnError("udfd queue work failed"); + taosMemoryFree(work); + taosMemoryFree(udfWork); + } } void udfdPipeCloseCb(uv_handle_t *pipe) { @@ -1193,9 +1331,15 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { fnError("udfd new connection error. code: %s", uv_strerror(status)); return; } + int32_t code = 0; uv_pipe_t *client = (uv_pipe_t *)taosMemoryMalloc(sizeof(uv_pipe_t)); - uv_pipe_init(global.loop, client, 0); + code = uv_pipe_init(global.loop, client, 0); + if (code) { + fnError("udfd pipe init error %s", uv_strerror(code)); + taosMemoryFree(client); + return; + } if (uv_accept(server, (uv_stream_t *)client) == 0) { SUdfdUvConn *ctx = taosMemoryMalloc(sizeof(SUdfdUvConn)); ctx->pWorkList = NULL; @@ -1205,7 +1349,13 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { ctx->inputCap = 0; client->data = ctx; ctx->client = (uv_stream_t *)client; - uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead); + code = uv_read_start((uv_stream_t *)client, udfdAllocBuffer, udfdPipeRead); + if (code) { + fnError("udfd read start error %s", uv_strerror(code)); + udfdUvHandleError(ctx); + taosMemoryFree(ctx); + taosMemoryFree(client); + } } else { uv_close((uv_handle_t *)client, NULL); } @@ -1214,8 +1364,14 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { fnInfo("udfd signal received: %d\n", signum); uv_fs_t req; - uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); - uv_signal_stop(handle); + int32_t code = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + if(code) { + fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(code), __LINE__); + } + code = uv_signal_stop(handle); + if(code) { + fnError("stop signal handler failed, reason:%s", uv_strerror(code)); + } uv_stop(global.loop); } @@ -1224,12 +1380,12 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) { if (strcmp(argv[i], "-c") == 0) { if (i < argc - 1) { if (strlen(argv[++i]) >= PATH_MAX) { - printf("config file path overflow"); + (void)printf("config file path overflow"); return -1; } tstrncpy(configDir, argv[i], PATH_MAX); } else { - printf("'-c' requires a parameter, default is %s\n", configDir); + (void)printf("'-c' requires a parameter, default is %s\n", configDir); return -1; } } else if (strcmp(argv[i], "-V") == 0) { @@ -1242,9 +1398,9 @@ static int32_t udfdParseArgs(int32_t argc, char *argv[]) { } static void udfdPrintVersion() { - printf("udfd version: %s compatible_version: %s\n", version, compatible_version); - printf("git: %s\n", gitinfo); - printf("build: %s\n", buildinfo); + (void)printf("udfd version: %s compatible_version: %s\n", version, compatible_version); + (void)printf("git: %s\n", gitinfo); + (void)printf("build: %s\n", buildinfo); } static int32_t udfdInitLog() { @@ -1270,35 +1426,31 @@ void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { taosMemoryFree(buf->base); } -static int32_t removeListeningPipe() { +static void removeListeningPipe() { uv_fs_t req; int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); uv_fs_req_cleanup(&req); - return err; + if(err) { + fnError("remove listening pipe %s failed, reason:%s, lino:%d", global.listenPipeName, uv_strerror(err), __LINE__); + } } static int32_t udfdUvInit() { - uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); - if (loop) { - uv_loop_init(loop); - } else { - return -1; - } - global.loop = loop; + TAOS_CHECK_RETURN(uv_loop_init(global.loop)); if (tsStartUdfd) { // udfd is started by taosd, which shall exit when taosd exit - uv_pipe_init(global.loop, &global.ctrlPipe, 1); - uv_pipe_open(&global.ctrlPipe, 0); - uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); + TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.ctrlPipe, 1)); + TAOS_CHECK_RETURN(uv_pipe_open(&global.ctrlPipe, 0)); + TAOS_CHECK_RETURN(uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb)); } getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName)); removeListeningPipe(); - uv_pipe_init(global.loop, &global.listeningPipe, 0); + TAOS_CHECK_RETURN(uv_pipe_init(global.loop, &global.listeningPipe, 0)); - uv_signal_init(global.loop, &global.intrSignal); - uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT); + TAOS_CHECK_RETURN(uv_signal_init(global.loop, &global.intrSignal)); + TAOS_CHECK_RETURN(uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT)); int r; fnInfo("bind to pipe %s", global.listenPipeName); @@ -1321,25 +1473,59 @@ static void udfdCloseWalkCb(uv_handle_t *handle, void *arg) { } } -static int32_t udfdRun() { - uv_mutex_init(&global.scriptPluginsMutex); +static int32_t udfdGlobalDataInit() { + uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); + if (loop == NULL) { + fnError("udfd init uv loop failed, mem overflow"); + return -1; + } + global.loop = loop; + + if (uv_mutex_init(&global.scriptPluginsMutex) != 0) { + fnError("udfd init script plugins mutex failed"); + return -1; + } global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - uv_mutex_init(&global.udfsMutex); + if (global.udfsHash == NULL) { + return terrno; + } + // taosHashSetFreeFp(global.udfsHash, udfdFreeUdf); - fnInfo("start udfd event loop"); - uv_run(global.loop, UV_RUN_DEFAULT); - fnInfo("udfd event loop stopped."); - - uv_loop_close(global.loop); - - uv_walk(global.loop, udfdCloseWalkCb, NULL); - uv_run(global.loop, UV_RUN_DEFAULT); - uv_loop_close(global.loop); + if (uv_mutex_init(&global.udfsMutex) != 0) { + fnError("udfd init udfs mutex failed"); + return -2; + } return 0; } +static void udfdGlobalDataDeinit() { + taosHashCleanup(global.udfsHash); + uv_mutex_destroy(&global.udfsMutex); + uv_mutex_destroy(&global.scriptPluginsMutex); + taosMemoryFree(global.loop); + fnInfo("udfd global data deinit"); +} + +static void udfdRun() { + fnInfo("start udfd event loop"); + int32_t code = uv_run(global.loop, UV_RUN_DEFAULT); + if(code != 0) { + fnError("udfd event loop still has active handles or requests."); + } + fnInfo("udfd event loop stopped."); + + (void)uv_loop_close(global.loop); + + uv_walk(global.loop, udfdCloseWalkCb, NULL); + code = uv_run(global.loop, UV_RUN_DEFAULT); + if(code != 0) { + fnError("udfd event loop still has active handles or requests."); + } + (void)uv_loop_close(global.loop); +} + int32_t udfdInitResidentFuncs() { if (strlen(tsUdfdResFuncs) == 0) { return TSDB_CODE_SUCCESS; @@ -1352,13 +1538,17 @@ int32_t udfdInitResidentFuncs() { char func[TSDB_FUNC_NAME_LEN + 1] = {0}; strncpy(func, token, TSDB_FUNC_NAME_LEN); fnInfo("udfd add resident function %s", func); - taosArrayPush(global.residentFuncs, func); + if(taosArrayPush(global.residentFuncs, func) == NULL) + { + taosArrayDestroy(global.residentFuncs); + return TSDB_CODE_OUT_OF_MEMORY; + } } return TSDB_CODE_SUCCESS; } -int32_t udfdDeinitResidentFuncs() { +void udfdDeinitResidentFuncs() { for (int32_t i = 0; i < taosArrayGetSize(global.residentFuncs); ++i) { char *funcName = taosArrayGet(global.residentFuncs, i); SUdf **udfInHash = taosHashGet(global.udfsHash, funcName, strlen(funcName)); @@ -1366,18 +1556,15 @@ int32_t udfdDeinitResidentFuncs() { SUdf *udf = *udfInHash; int32_t code = udf->scriptPlugin->udfDestroyFunc(udf->scriptUdfCtx); fnDebug("udfd destroy function returns %d", code); - taosHashRemove(global.udfsHash, funcName, strlen(funcName)); + if(taosHashRemove(global.udfsHash, funcName, strlen(funcName)) != 0) + { + fnError("udfd remove resident function %s failed", funcName); + } taosMemoryFree(udf); } } taosArrayDestroy(global.residentFuncs); - return TSDB_CODE_SUCCESS; -} - -int32_t udfdCleanup() { - uv_mutex_destroy(&global.udfsMutex); - taosHashCleanup(global.udfsHash); - return 0; + fnInfo("udfd resident functions are deinit"); } int32_t udfdCreateUdfSourceDir() { @@ -1392,20 +1579,27 @@ int32_t udfdCreateUdfSourceDir() { return code; } -int32_t udfdDestroyUdfSourceDir() { +void udfdDestroyUdfSourceDir() { fnInfo("destory udf source directory %s", global.udfDataDir); taosRemoveDir(global.udfDataDir); - return 0; } int main(int argc, char *argv[]) { + int code = 0; + bool logInitialized = false; + bool cfgInitialized = false; + bool openClientRpcFinished = false; + bool residentFuncsInited = false; + bool udfSourceDirInited = false; + bool globalDataInited = false; + if (!taosCheckSystemIsLittleEnd()) { - printf("failed to start since on non-little-end machines\n"); + (void)printf("failed to start since on non-little-end machines\n"); return -1; } if (udfdParseArgs(argc, argv) != 0) { - printf("failed to start since parse args error\n"); + (void)printf("failed to start since parse args error\n"); return -1; } @@ -1416,47 +1610,89 @@ int main(int argc, char *argv[]) { if (udfdInitLog() != 0) { // ignore create log failed, because this error no matter - printf("failed to start since init log error\n"); + (void)printf("failed to init udfd log."); + } else { + logInitialized = true; // log is initialized } if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { fnError("failed to start since read config error"); - taosCloseLog(); - return -2; + code = -2; + goto _exit; } + cfgInitialized = true; // cfg is initialized + fnInfo("udfd start with config file %s", configDir); - initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); + if (initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp) != 0) { + fnError("init ep set from cfg failed"); + code = -3; + goto _exit; + } + fnInfo("udfd start with mnode ep %s", global.mgmtEp.epSet.eps[0].fqdn); if (udfdOpenClientRpc() != 0) { fnError("open rpc connection to mnode failed"); - taosCloseLog(); - return -3; + code = -4; + goto _exit; } + fnInfo("udfd rpc client is opened"); + openClientRpcFinished = true; // rpc is opened if (udfdCreateUdfSourceDir() != 0) { fnError("create udf source directory failed"); - taosCloseLog(); - return -4; + code = -5; + goto _exit; } + udfSourceDirInited = true; // udf source dir is created + fnInfo("udfd udf source directory is created"); + + if (udfdGlobalDataInit() != 0) { + fnError("init global data failed"); + code = -6; + goto _exit; + } + globalDataInited = true; // global data is inited + fnInfo("udfd global data is inited"); if (udfdUvInit() != 0) { fnError("uv init failure"); - taosCloseLog(); - return -5; + code = -7; + goto _exit; } + fnInfo("udfd uv is inited"); - udfdInitResidentFuncs(); + if (udfdInitResidentFuncs() != 0) { + fnError("init resident functions failed"); + code = -8; + goto _exit; + } + residentFuncsInited = true; // resident functions are inited + fnInfo("udfd resident functions are inited"); udfdRun(); + fnInfo("udfd exit normally"); removeListeningPipe(); - udfdDestroyUdfSourceDir(); - udfdCloseClientRpc(); - - udfdDeinitResidentFuncs(); - udfdDeinitScriptPlugins(); - taosCloseLog(); - udfdCleanup(); - return 0; +_exit: + if (globalDataInited) { + udfdGlobalDataDeinit(); + } + if (residentFuncsInited) { + udfdDeinitResidentFuncs(); + } + if (udfSourceDirInited) { + udfdDestroyUdfSourceDir(); + } + if (openClientRpcFinished) { + udfdCloseClientRpc(); + } + if (cfgInitialized) { + taosCleanupCfg(); + } + if (logInitialized) { + taosCloseLog(); + } + + return code; } diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index aa8b88b738..346cb468b4 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -9,17 +9,20 @@ #include "tglobal.h" #include "tudf.h" +#define TAOSFPRINTF(stream, format, ...) ((void)fprintf(stream, format, ##__VA_ARGS__)) +#define TAOSPRINTF(format, ...) ((void)printf(format, ##__VA_ARGS__)) + static int32_t parseArgs(int32_t argc, char *argv[]) { for (int32_t i = 1; i < argc; ++i) { if (strcmp(argv[i], "-c") == 0) { if (i < argc - 1) { if (strlen(argv[++i]) >= PATH_MAX) { - printf("config file path overflow"); + TAOSPRINTF("config file path overflow"); return -1; } tstrncpy(configDir, argv[i], PATH_MAX); } else { - printf("'-c' requires a parameter, default is %s\n", configDir); + TAOSPRINTF("'-c' requires a parameter, default is %s\n", configDir); return -1; } } @@ -35,6 +38,7 @@ static int32_t initLog() { } int scalarFuncTest() { + int32_t ret = 0; UdfcFuncHandle handle; if (doSetupUdf("udf1", &handle) != 0) { @@ -47,10 +51,18 @@ int scalarFuncTest() { SSDataBlock *pBlock = █ for (int32_t i = 0; i < 1; ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); - blockDataAppendColInfo(pBlock, &colInfo); + ret = blockDataAppendColInfo(pBlock, &colInfo); + if (ret != 0) { + fnError("failed to append column info"); + return -1; + } } - blockDataEnsureCapacity(pBlock, 1024); + ret = blockDataEnsureCapacity(pBlock, 1024); + if (ret != 0) { + fnError("failed to ensure capacity"); + return -1; + } pBlock->info.rows = 1024; SColumnInfoData *pCol = taosArrayGet(pBlock->pDataBlock, 0); @@ -63,38 +75,56 @@ int scalarFuncTest() { input.columnData = taosArrayGet(pBlock->pDataBlock, 0); SScalarParam output = {0}; - doCallUdfScalarFunc(handle, &input, 1, &output); + ret = doCallUdfScalarFunc(handle, &input, 1, &output); + if (ret != 0) { + fnError("failed to call udf scalar function"); + return -1; + } taosArrayDestroy(pBlock->pDataBlock); SColumnInfoData *col = output.columnData; for (int32_t i = 0; i < output.numOfRows; ++i) { - if (i % 100 == 0) fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); + if (i % 100 == 0) TAOSFPRINTF(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); } colDataDestroy(output.columnData); taosMemoryFree(output.columnData); } int64_t end = taosGetTimestampUs(); - fprintf(stderr, "time: %f\n", (end - beg) / 1000.0); - doTeardownUdf(handle); + TAOSFPRINTF(stderr, "time: %f\n", (end - beg) / 1000.0); + ret = doTeardownUdf(handle); + if (ret != 0) { + fnError("failed to teardown udf"); + return -1; + } return 0; } int aggregateFuncTest() { + int32_t ret = 0; UdfcFuncHandle handle; - if (doSetupUdf("udf2", &handle) != 0) { - fnError("setup udf failure"); + ret = doSetupUdf("udf2", &handle); + if (ret != 0) { + fnError("setup udf failure, code:%d", ret); return -1; } SSDataBlock *pBlock = createDataBlock(); for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); - blockDataAppendColInfo(pBlock, &colInfo); + ret = blockDataAppendColInfo(pBlock, &colInfo); + if(ret != 0) { + fnError( "failed to append column info. code:%d", ret); + return -1; + } } - blockDataEnsureCapacity(pBlock, 1024); + ret = blockDataEnsureCapacity(pBlock, 1024); + if (ret != 0) { + fnError( "failed to ensure capacity. code:%d", ret); + return -1; + } pBlock->info.rows = 1024; SColumnInfoData *pColInfo = bdGetColumnInfoData(pBlock, 0); @@ -105,37 +135,77 @@ int aggregateFuncTest() { SUdfInterBuf buf = {0}; SUdfInterBuf newBuf = {0}; SUdfInterBuf resultBuf = {0}; - doCallUdfAggInit(handle, &buf); - doCallUdfAggProcess(handle, pBlock, &buf, &newBuf); + ret = doCallUdfAggInit(handle, &buf); + if (ret != 0) { + fnError("failed to init udf. code:%d", ret); + return -1; + } + ret = doCallUdfAggProcess(handle, pBlock, &buf, &newBuf); + if (ret != 0) { + fnError("failed to process udf. code:%d", ret); + return -1; + } taosArrayDestroy(pBlock->pDataBlock); - doCallUdfAggFinalize(handle, &newBuf, &resultBuf); + ret = doCallUdfAggFinalize(handle, &newBuf, &resultBuf); + if (ret != 0) { + TAOSFPRINTF(stderr,"failed to finalize udf. code:%d", ret); + return -1; + } if (resultBuf.buf != NULL) { - fprintf(stderr, "agg result: %f\n", *(double *)resultBuf.buf); + TAOSFPRINTF(stderr, "agg result: %f\n", *(double *)resultBuf.buf); } else { - fprintf(stderr, "result buffer is null"); + fnError("result buffer is null"); } freeUdfInterBuf(&buf); freeUdfInterBuf(&newBuf); freeUdfInterBuf(&resultBuf); - doTeardownUdf(handle); + ret = doTeardownUdf(handle); + if (ret != 0) { + fnError("failed to teardown udf. code:%d", ret); + return -1; + } blockDataDestroy(pBlock); return 0; } int main(int argc, char *argv[]) { - parseArgs(argc, argv); - initLog(); + int32_t ret = 0; + ret = parseArgs(argc, argv); + if (ret != 0) { + fnError("failed to parse args"); + return -1; + } + ret = initLog(); + if (ret != 0) { + fnError("failed to init log"); + return -1; + } if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { fnError("failed to start since read config error"); return -1; } - udfcOpen(); + if (udfcOpen() != 0) { + fnError("failed to open udfc"); + return -1; + } uv_sleep(1000); - scalarFuncTest(); - aggregateFuncTest(); - udfcClose(); + ret = scalarFuncTest(); + if (ret != 0) { + fnError("failed to run scalar function test"); + return -1; + } + ret = aggregateFuncTest(); + if (ret != 0) { + fnError("failed to run aggregate function test"); + return -1; + } + ret = udfcClose(); + if (ret != 0) { + fnError("failed to close udfc"); + return -1; + } } diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index 5b95087996..da30ede8bf 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -14,18 +14,25 @@ DLL_EXPORT int32_t udf1_init() { return 0; } DLL_EXPORT int32_t udf1_destroy() { return 0; } DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) { + int32_t code = 0; SUdfColumnData *resultData = &resultCol->colData; for (int32_t i = 0; i < block->numOfRows; ++i) { int j = 0; for (; j < block->numOfCols; ++j) { if (udfColDataIsNull(block->udfCols[j], i)) { - udfColDataSetNull(resultCol, i); + code = udfColDataSetNull(resultCol, i); + if (code != 0) { + return code; + } break; } } if (j == block->numOfCols) { int32_t luckyNum = 1; - udfColDataSet(resultCol, i, (char *)&luckyNum, false); + code = udfColDataSet(resultCol, i, (char *)&luckyNum, false); + if (code != 0) { + return code; + } } } // to simulate actual processing delay by udf diff --git a/source/libs/function/test/udf1_dup.c b/source/libs/function/test/udf1_dup.c index c251192da3..8e0af947b9 100644 --- a/source/libs/function/test/udf1_dup.c +++ b/source/libs/function/test/udf1_dup.c @@ -15,18 +15,25 @@ DLL_EXPORT int32_t udf1_dup_init() { return 0; } DLL_EXPORT int32_t udf1_dup_destroy() { return 0; } DLL_EXPORT int32_t udf1_dup(SUdfDataBlock *block, SUdfColumn *resultCol) { + int32_t code = 0; SUdfColumnData *resultData = &resultCol->colData; for (int32_t i = 0; i < block->numOfRows; ++i) { int j = 0; for (; j < block->numOfCols; ++j) { if (udfColDataIsNull(block->udfCols[j], i)) { - udfColDataSetNull(resultCol, i); + code = udfColDataSetNull(resultCol, i); + if (code != 0) { + return code; + } break; } } if (j == block->numOfCols) { int32_t luckyNum = 2; - udfColDataSet(resultCol, i, (char *)&luckyNum, false); + code = udfColDataSet(resultCol, i, (char *)&luckyNum, false); + if (code != 0) { + return code; + } } } // to simulate actual processing delay by udf diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 9bd60be2b6..b7880ba0cf 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -552,22 +552,22 @@ _end: #endif } -int32_t taosGetCpuCores(float *numOfCores, bool physical) { +void taosGetCpuCores(float *numOfCores, bool physical) { #ifdef WINDOWS SYSTEM_INFO info; GetSystemInfo(&info); *numOfCores = info.dwNumberOfProcessors; - return 0; + return; #elif defined(_TD_DARWIN_64) *numOfCores = sysconf(_SC_NPROCESSORS_ONLN); - return 0; + return; #else if (physical) { *numOfCores = sysconf(_SC_NPROCESSORS_ONLN); } else { taosCntrGetCpuCores(numOfCores); } - return 0; + return; #endif } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 3801ae9ffd..3f3bee4e35 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -723,6 +723,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_BUFSIZE, "udf invalid bufsize TAOS_DEFINE_ERROR(TSDB_CODE_UDF_INVALID_OUTPUT_TYPE, "udf invalid output type") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_SCRIPT_NOT_SUPPORTED, "udf program language not supported") TAOS_DEFINE_ERROR(TSDB_CODE_UDF_FUNC_EXEC_FAILURE, "udf function execution failure") +TAOS_DEFINE_ERROR(TSDB_CODE_UDF_UV_EXEC_FAILURE, "udf uvlib function execution failure") //schemaless TAOS_DEFINE_ERROR(TSDB_CODE_SML_INVALID_PROTOCOL_TYPE, "Invalid line protocol type")