diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 6bc9f84d6d..a8198a804d 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -40,7 +40,7 @@ typedef struct SUdfdData { #ifdef WINDOWS HANDLE jobHandle; #endif - int spawnErr; + int32_t spawnErr; uv_pipe_t ctrlPipe; uv_async_t stopAsync; int32_t stopCalled; @@ -51,15 +51,17 @@ typedef struct SUdfdData { SUdfdData udfdGlobal = {0}; int32_t udfStartUdfd(int32_t startDnodeId); -void udfStopUdfd(); +void udfStopUdfd(); + +extern char **environ; static int32_t udfSpawnUdfd(SUdfdData *pData); -void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal); +void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal); static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg); static void udfUdfdStopAsyncCb(uv_async_t *async); static void udfWatchUdfd(void *args); -void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { +void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int32_t termSignal) { fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); SUdfdData *pData = process->data; if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) { @@ -67,7 +69,7 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { } else { fnInfo("udfd process restart"); int32_t code = udfSpawnUdfd(pData); - if(code != 0) { + if (code != 0) { fnError("udfd process restart failed with code:%d", code); } } @@ -75,6 +77,8 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { static int32_t udfSpawnUdfd(SUdfdData *pData) { fnInfo("start to init udfd"); + + int32_t err = 0; uv_process_options_t options = {0}; char path[PATH_MAX] = {0}; @@ -126,17 +130,17 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { char thrdPoolSizeEnvItem[32] = {0}; snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); - float numCpuCores = 4; + float numCpuCores = 4; int32_t code = taosGetCpuCores(&numCpuCores, false); - if(code != 0) { - fnError("failed to get cpu cores, code:%d", code); + if (code != 0) { + fnError("failed to get cpu cores, code:0x%x", code); } numCpuCores = TMAX(numCpuCores, 2); - snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2); + snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int32_t)numCpuCores * 2); - char pathTaosdLdLib[512] = {0}; - size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib); - int ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen); + char pathTaosdLdLib[512] = {0}; + size_t taosdLdLibPathLen = sizeof(pathTaosdLdLib); + int32_t ret = uv_os_getenv("LD_LIBRARY_PATH", pathTaosdLdLib, &taosdLdLibPathLen); if (ret != UV_ENOBUFS) { taosdLdLibPathLen = strlen(pathTaosdLdLib); } @@ -158,8 +162,8 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { char *taosFqdnEnvItem = NULL; char *taosFqdn = getenv("TAOS_FQDN"); if (taosFqdn != NULL) { - int subLen = strlen(taosFqdn); - int len = strlen("TAOS_FQDN=") + subLen + 1; + int32_t subLen = strlen(taosFqdn); + int32_t len = strlen("TAOS_FQDN=") + subLen + 1; taosFqdnEnvItem = taosMemoryMalloc(len); if (taosFqdnEnvItem != NULL) { tstrncpy(taosFqdnEnvItem, "TAOS_FQDN=", len); @@ -171,11 +175,53 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { } } - char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem,taosFqdnEnvItem, NULL}; + char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, ldLibPathEnvItem, taosFqdnEnvItem, NULL}; - options.env = envUdfd; + char **envUdfdWithPEnv = NULL; + if (environ != NULL) { + int32_t lenEnvUdfd = ARRAY_SIZE(envUdfd); + int32_t numEnviron = 0; + while (environ[numEnviron] != NULL) { + numEnviron++; + } - int err = uv_spawn(&pData->loop, &pData->process, &options); + envUdfdWithPEnv = (char **)taosMemoryCalloc(numEnviron + lenEnvUdfd, sizeof(char *)); + if (envUdfdWithPEnv == NULL) { + err = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + for (int32_t i = 0; i < numEnviron; i++) { + int32_t len = strlen(environ[i]) + 1; + envUdfdWithPEnv[i] = (char *)taosMemoryCalloc(len, 1); + if (envUdfdWithPEnv[i] == NULL) { + err = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + tstrncpy(envUdfdWithPEnv[i], environ[i], len); + } + + for (int32_t i = 0; i < lenEnvUdfd; i++) { + if (envUdfd[i] != NULL) { + int32_t len = strlen(envUdfd[i]) + 1; + envUdfdWithPEnv[numEnviron + i] = (char *)taosMemoryCalloc(len, 1); + if (envUdfdWithPEnv[numEnviron + i] == NULL) { + err = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + tstrncpy(envUdfdWithPEnv[numEnviron + i], envUdfd[i], len); + } + } + envUdfdWithPEnv[numEnviron + lenEnvUdfd - 1] = NULL; + + options.env = envUdfdWithPEnv; + } else { + options.env = envUdfd; + } + + err = uv_spawn(&pData->loop, &pData->process, &options); pData->process.data = (void *)pData; #ifdef WINDOWS @@ -202,7 +248,21 @@ static int32_t udfSpawnUdfd(SUdfdData *pData) { } else { fnInfo("udfd is initialized"); } - if(taosFqdnEnvItem) taosMemoryFree(taosFqdnEnvItem); + +_OVER: + if (taosFqdnEnvItem) { + taosMemoryFree(taosFqdnEnvItem); + } + + if (envUdfdWithPEnv != NULL) { + int32_t i = 0; + while (envUdfdWithPEnv[i] != NULL) { + taosMemoryFree(envUdfdWithPEnv[i]); + i++; + } + taosMemoryFree(envUdfdWithPEnv); + } + return err; } @@ -225,13 +285,13 @@ static void udfWatchUdfd(void *args) { TAOS_UV_CHECK_ERRNO(udfSpawnUdfd(pData)); atomic_store_32(&pData->spawnErr, 0); (void)uv_barrier_wait(&pData->barrier); - int num = uv_run(&pData->loop, UV_RUN_DEFAULT); + int32_t num = uv_run(&pData->loop, UV_RUN_DEFAULT); fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__); uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL); num = uv_run(&pData->loop, UV_RUN_DEFAULT); fnInfo("udfd loop exit with %d active handles, line:%d", num, __LINE__); - if(uv_loop_close(&pData->loop) != 0) { + if (uv_loop_close(&pData->loop) != 0) { fnError("udfd loop close failed, lino:%d", __LINE__); } return; @@ -240,7 +300,7 @@ _exit: if (terrno != 0) { (void)uv_barrier_wait(&pData->barrier); atomic_store_32(&pData->spawnErr, terrno); - if(uv_loop_close(&pData->loop) != 0) { + if (uv_loop_close(&pData->loop) != 0) { fnError("udfd loop close failed, lino:%d", __LINE__); } fnError("udfd thread exit with code:%d lino:%d", terrno, terrln); @@ -271,10 +331,10 @@ int32_t udfStartUdfd(int32_t startDnodeId) { int32_t err = atomic_load_32(&pData->spawnErr); if (err != 0) { uv_barrier_destroy(&pData->barrier); - if(uv_async_send(&pData->stopAsync) != 0) { + if (uv_async_send(&pData->stopAsync) != 0) { fnError("start udfd: failed to send stop async"); } - if(uv_thread_join(&pData->thread)!= 0) { + if (uv_thread_join(&pData->thread) != 0) { fnError("start udfd: failed to join udfd thread"); } pData->needCleanUp = false; @@ -299,10 +359,10 @@ void udfStopUdfd() { atomic_store_32(&pData->stopCalled, 1); pData->needCleanUp = false; uv_barrier_destroy(&pData->barrier); - if(uv_async_send(&pData->stopAsync) != 0) { + if (uv_async_send(&pData->stopAsync) != 0) { fnError("stop udfd: failed to send stop async"); } - if(uv_thread_join(&pData->thread) != 0) { + if (uv_thread_join(&pData->thread) != 0) { fnError("stop udfd: failed to join udfd thread"); } @@ -341,7 +401,7 @@ typedef void *QUEUE[2]; #define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) /* Public macros. */ -#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr)-offsetof(type, field))) +#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr) - offsetof(type, field))) /* Important note: mutating the list while QUEUE_FOREACH is * iterating over its elements results in undefined behavior. @@ -434,8 +494,8 @@ typedef struct SUdfcProxy { QUEUE uvProcTaskQueue; uv_mutex_t udfStubsMutex; - SArray *udfStubs; // SUdfcFuncStub - SArray *expiredUdfStubs; //SUdfcFuncStub + SArray *udfStubs; // SUdfcFuncStub + SArray *expiredUdfStubs; // SUdfcFuncStub uv_mutex_t udfcUvMutex; int8_t initialized; @@ -458,7 +518,7 @@ typedef struct SUdfcUvSession { typedef struct SClientUvTaskNode { SUdfcProxy *udfc; int8_t type; - int errCode; + int32_t errCode; uv_pipe_t *pipe; @@ -516,7 +576,7 @@ enum { UDFC_STATE_STOPPING, // stopping after udfcClose }; -void getUdfdPipeName(char *pipeName, int32_t size); +void getUdfdPipeName(char *pipeName, int32_t size); int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup); void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request); int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state); @@ -801,12 +861,12 @@ void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) { buf = decodeUdfTeardownResponse(buf, &rsp->teardownRsp); break; default: - rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR; + rsp->code = TSDB_CODE_UDF_INTERNAL_ERROR; fnError("decode udf response, invalid udf response type %d", rsp->type); break; } - if(buf == NULL) { - rsp->code = terrno; + if (buf == NULL) { + rsp->code = terrno; fnError("decode udf response failed, code:0x%x", rsp->code); } return (void *)buf; @@ -847,12 +907,12 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfBlock->numOfRows = block->info.rows; udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock); udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *)); - if((udfBlock->udfCols) == NULL) { + if ((udfBlock->udfCols) == NULL) { return terrno; } for (int32_t i = 0; i < udfBlock->numOfCols; ++i) { udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn)); - if(udfBlock->udfCols[i] == NULL) { + if (udfBlock->udfCols[i] == NULL) { return terrno; } SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i); @@ -866,18 +926,18 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) { udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); - if(udfCol->colData.varLenCol.varOffsets == NULL) { + if (udfCol->colData.varLenCol.varOffsets == NULL) { return terrno; } memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen); udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows); udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen); - if(udfCol->colData.varLenCol.payload == NULL) { + if (udfCol->colData.varLenCol.payload == NULL) { return terrno; } if (col->reassigned) { for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) { - char* pColData = col->pData + col->varmeta.offset[row]; + char *pColData = col->pData + col->varmeta.offset[row]; int32_t colSize = 0; if (col->info.type == TSDB_DATA_TYPE_JSON) { colSize = getJsonValueLen(pColData); @@ -894,7 +954,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows); int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen; udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen); - if(udfCol->colData.fixLenCol.nullBitmap == NULL) { + if (udfCol->colData.fixLenCol.nullBitmap == NULL) { return terrno; } char *bitmap = udfCol->colData.fixLenCol.nullBitmap; @@ -927,11 +987,11 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { code = bdGetColumnInfoData(block, 0, &col); TAOS_CHECK_GOTO(code, &lino, _exit); - for (int i = 0; i < udfCol->colData.numOfRows; ++i) { + for (int32_t i = 0; i < udfCol->colData.numOfRows; ++i) { if (udfColDataIsNull(udfCol, i)) { colDataSetNULL(col, i); } else { - char* data = udfColDataGetData(udfCol, i); + char *data = udfColDataGetData(udfCol, i); code = colDataSetVal(col, i, data, false); TAOS_CHECK_GOTO(code, &lino, _exit); } @@ -953,32 +1013,32 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS } // create the basic block info structure - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pInfo = input[i].columnData; - SColumnInfoData d = {0}; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData *pInfo = input[i].columnData; + SColumnInfoData d = {0}; d.info = pInfo->info; TAOS_CHECK_GOTO(blockDataAppendColInfo(output, &d), &lino, _exit); } - TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit); + TAOS_CHECK_GOTO(blockDataEnsureCapacity(output, numOfRows), &lino, _exit); - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pDest = taosArrayGet(output->pDataBlock, i); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData *pDest = taosArrayGet(output->pDataBlock, i); - SColumnInfoData* pColInfoData = input[i].columnData; + SColumnInfoData *pColInfoData = input[i].columnData; TAOS_CHECK_GOTO(colDataAssign(pDest, pColInfoData, input[i].numOfRows, &output->info), &lino, _exit); if (input[i].numOfRows < numOfRows) { int32_t startRow = input[i].numOfRows; - int expandRows = numOfRows - startRow; - bool isNull = colDataIsNull_s(pColInfoData, (input+i)->numOfRows - 1); + int32_t expandRows = numOfRows - startRow; + bool isNull = colDataIsNull_s(pColInfoData, (input + i)->numOfRows - 1); if (isNull) { colDataSetNNULL(pDest, startRow, expandRows); } else { - char* src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1); - for (int j = 0; j < expandRows; ++j) { - TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow+j, src, false), &lino, _exit); + char *src = colDataGetData(pColInfoData, (input + i)->numOfRows - 1); + for (int32_t j = 0; j < expandRows; ++j) { + TAOS_CHECK_GOTO(colDataSetVal(pDest, startRow + j, src, false), &lino, _exit); } } } @@ -1000,7 +1060,7 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { output->numOfRows = input->info.rows; output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData)); - if(output->columnData == NULL) { + if (output->columnData == NULL) { return terrno; } memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData)); @@ -1012,11 +1072,11 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { ////////////////////////////////////////////////////////////////////////////////////////////////////////////// // memory layout |---SUdfAggRes----|-----final result-----|---inter result----| typedef struct SUdfAggRes { - int8_t finalResNum; - int8_t interResNum; + int8_t finalResNum; + int8_t interResNum; int32_t interResBufLen; - char *finalResBuf; - char *interResBuf; + char *finalResBuf; + char *interResBuf; } SUdfAggRes; void onUdfcPipeClose(uv_handle_t *handle); @@ -1026,8 +1086,8 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf); void udfcUvHandleRsp(SClientUvConn *conn); void udfcUvHandleError(SClientUvConn *conn); void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); -void onUdfcPipeWrite(uv_write_t *write, int status); -void onUdfcPipeConnect(uv_connect_t *connect, int status); +void onUdfcPipeWrite(uv_write_t *write, int32_t status); +void onUdfcPipeConnect(uv_connect_t *connect, int32_t status); int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask); int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask); int32_t udfcStartUvTask(SClientUvTaskNode *uvTask); @@ -1037,7 +1097,7 @@ void udfStopAsyncCb(uv_async_t *async); void constructUdfService(void *argsThread); int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType); int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle); -int compareUdfcFuncSub(const void *elem1, const void *elem2); +int32_t compareUdfcFuncSub(const void *elem1, const void *elem2); int32_t doTeardownUdf(UdfcFuncHandle handle); int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, @@ -1062,9 +1122,9 @@ int32_t udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); -void cleanupNotExpiredUdfs(); -void cleanupExpiredUdfs(); -int compareUdfcFuncSub(const void *elem1, const void *elem2) { +void cleanupNotExpiredUdfs(); +void cleanupExpiredUdfs(); +int32_t compareUdfcFuncSub(const void *elem1, const void *elem2) { SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; return strcmp(stub1->udfName, stub2->udfName); @@ -1150,21 +1210,22 @@ void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) { void cleanupExpiredUdfs() { int32_t i = 0; SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); - if(expiredUdfStubs == NULL) { + if (expiredUdfStubs == NULL) { fnError("cleanupExpiredUdfs: failed to init array"); return; } while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) { SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i); if (stub->refCount == 0) { - fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); + fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, + stub->refCount); (void)doTeardownUdf(stub->handle); } else { - fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName, - stub->refCount, stub->createTime, stub->handle); + fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", + stub->udfName, stub->refCount, stub->createTime, stub->handle); UdfcFuncHandle handle = stub->handle; if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { - if(taosArrayPush(expiredUdfStubs, stub) == NULL) { + if (taosArrayPush(expiredUdfStubs, stub) == NULL) { fnError("cleanupExpiredUdfs: failed to push udf stub to array"); } } else { @@ -1347,7 +1408,8 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { return code; } - SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; + SUdfInterBuf state = { + .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); @@ -1391,8 +1453,9 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->bytes; SUdfInterBuf resultBuf = {0}; - SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; - int32_t udfCallCode = 0; + SUdfInterBuf state = { + .buf = udfRes->interResBuf, .bufLen = udfRes->interResBufLen, .numOfResult = udfRes->interResNum}; + int32_t udfCallCode = 0; udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf); if (udfCallCode != 0) { fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode); @@ -1448,7 +1511,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode * SUdfResponse rsp = {0}; void *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp); code = rsp.code; - if(code != 0) { + if (code != 0) { fnError("udfc get udf task result failure. code: %d", code); } @@ -1474,18 +1537,18 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode * taosMemoryFree(uvTask->rspBuf.base); } else { code = uvTask->errCode; - if(code != 0) { + if (code != 0) { fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__); } } } else if (uvTask->type == UV_TASK_CONNECT) { code = uvTask->errCode; - if(code != 0) { + if (code != 0) { fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__); } } else if (uvTask->type == UV_TASK_DISCONNECT) { code = uvTask->errCode; - if(code != 0) { + if (code != 0) { fnError("udfc get udf task result failure. code: %d, line:%d", code, __LINE__); } } @@ -1620,7 +1683,7 @@ void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { } } -void onUdfcPipeWrite(uv_write_t *write, int status) { +void onUdfcPipeWrite(uv_write_t *write, int32_t status) { SClientUvConn *conn = write->data; if (status < 0) { fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status)); @@ -1631,7 +1694,7 @@ void onUdfcPipeWrite(uv_write_t *write, int status) { taosMemoryFree(write); } -void onUdfcPipeConnect(uv_connect_t *connect, int status) { +void onUdfcPipeConnect(uv_connect_t *connect, int32_t status) { SClientUvTaskNode *uvTask = connect->data; if (status != 0) { fnError("client connect error, task seq: %" PRId64 ", code: %s", uvTask->seqNum, uv_strerror(status)); @@ -1639,7 +1702,7 @@ void onUdfcPipeConnect(uv_connect_t *connect, int status) { uvTask->errCode = status; int32_t code = uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead); - if(code != 0) { + if (code != 0) { fnError("udfc client connection %p read start failed. code: %d(%s)", uvTask->pipe, code, uv_strerror(code)); uvTask->errCode = code; } @@ -1678,13 +1741,12 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT } request.msgLen = bufLen; void *bufBegin = taosMemoryMalloc(bufLen); - if(bufBegin == NULL) { + if (bufBegin == NULL) { fnError("udfc create uv task, malloc buffer failed. size: %d", bufLen); return terrno; } void *buf = bufBegin; - if(encodeUdfRequest(&buf, &request) <= 0) - { + if (encodeUdfRequest(&buf, &request) <= 0) { fnError("udfc create uv task, encode request failed. size: %d", bufLen); taosMemoryFree(bufBegin); return TSDB_CODE_UDF_UV_EXEC_FAILURE; @@ -1695,9 +1757,8 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT } else if (uvTaskType == UV_TASK_DISCONNECT) { uvTask->pipe = task->session->udfUvPipe; } - if (uv_sem_init(&uvTask->taskSem, 0) != 0) - { - if (uvTaskType == UV_TASK_REQ_RSP) { + if (uv_sem_init(&uvTask->taskSem, 0) != 0) { + if (uvTaskType == UV_TASK_REQ_RSP) { taosMemoryFree(uvTask->reqBuf.base); } fnError("udfc create uv task, init semaphore failed."); @@ -1733,7 +1794,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); - if(pipe == NULL) { + if (pipe == NULL) { fnError("udfc event loop start connect task malloc pipe failed."); return terrno; } @@ -1745,7 +1806,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { uvTask->pipe = pipe; SClientUvConn *conn = taosMemoryCalloc(1, sizeof(SClientUvConn)); - if(conn == NULL) { + if (conn == NULL) { fnError("udfc event loop start connect task malloc conn failed."); taosMemoryFree(pipe); return terrno; @@ -1760,7 +1821,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { pipe->data = conn; uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); - if(connReq == NULL) { + if (connReq == NULL) { fnError("udfc event loop start connect task malloc connReq failed."); taosMemoryFree(pipe); taosMemoryFree(conn); @@ -1777,14 +1838,14 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { code = TSDB_CODE_UDF_PIPE_NOT_EXIST; } else { uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); - if(write == NULL) { + if (write == NULL) { fnError("udfc event loop start req_rsp task malloc write failed."); return terrno; } write->data = pipe->data; QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue; QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue); - int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite); + int32_t err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite); if (err != 0) { taosMemoryFree(write); fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err)); @@ -1874,7 +1935,7 @@ void udfStopAsyncCb(uv_async_t *async) { } void constructUdfService(void *argsThread) { - int32_t code = 0, lino = 0; + int32_t code = 0, lino = 0; SUdfcProxy *udfc = (SUdfcProxy *)argsThread; code = uv_loop_init(&udfc->uvLoop); TAOS_CHECK_GOTO(code, &lino, _exit); @@ -1891,7 +1952,7 @@ void constructUdfService(void *argsThread) { QUEUE_INIT(&udfc->uvProcTaskQueue); (void)uv_barrier_wait(&udfc->initBarrier); // TODO return value of uv_run - int num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); + int32_t num = uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); fnInfo("udfc uv loop exit. active handle num: %d", num); (void)uv_loop_close(&udfc->uvLoop); @@ -1909,7 +1970,7 @@ _exit: int32_t udfcOpen() { int32_t code = 0, lino = 0; - int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1); + int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1); if (old == 1) { return 0; } @@ -1927,12 +1988,12 @@ int32_t udfcOpen() { code = uv_mutex_init(&proxy->udfStubsMutex); TAOS_CHECK_GOTO(code, &lino, _exit); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); - if(proxy->udfStubs == NULL) { + if (proxy->udfStubs == NULL) { fnError("udfc init failed. udfStubs: %p", proxy->udfStubs); return -1; } proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); - if(proxy->expiredUdfStubs == NULL) { + if (proxy->expiredUdfStubs == NULL) { taosArrayDestroy(proxy->udfStubs); fnError("udfc init failed. expiredUdfStubs: %p", proxy->expiredUdfStubs); return -1; @@ -1956,10 +2017,10 @@ int32_t udfcClose() { SUdfcProxy *udfc = &gUdfcProxy; udfc->udfcState = UDFC_STATE_STOPPING; - if(uv_async_send(&udfc->loopStopAsync) != 0) { + if (uv_async_send(&udfc->loopStopAsync) != 0) { fnError("udfc close error to send stop async"); } - if(uv_thread_join(&udfc->loopThread) != 0 ) { + if (uv_thread_join(&udfc->loopThread) != 0) { fnError("udfc close errir to join loop thread"); } uv_mutex_destroy(&udfc->taskQueueMutex); @@ -1974,9 +2035,9 @@ int32_t udfcClose() { } int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { - int32_t code = 0, lino = 0; + int32_t code = 0, lino = 0; SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); - if(uvTask == NULL) { + if (uvTask == NULL) { fnError("udfc client task: %p failed to allocate memory for uvTask", task); return terrno; } @@ -2006,14 +2067,14 @@ _exit: } int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { - int32_t code = TSDB_CODE_SUCCESS, lino = 0; + int32_t code = TSDB_CODE_SUCCESS, lino = 0; SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); - if(task == NULL) { + if (task == NULL) { fnError("doSetupUdf, failed to allocate memory for task"); return terrno; } task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession)); - if(task->session == NULL) { + if (task->session == NULL) { fnError("doSetupUdf, failed to allocate memory for session"); taosMemoryFree(task); return terrno; @@ -2059,7 +2120,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf return TSDB_CODE_UDF_PIPE_NOT_EXIST; } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); - if(task == NULL) { + if (task == NULL) { fnError("udfc call udf. failed to allocate memory for task"); return terrno; } @@ -2163,8 +2224,8 @@ int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdf int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { int8_t callType = TSDB_UDF_CALL_SCALA_PROC; SSDataBlock inputBlock = {0}; - int32_t code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock); - if(code != 0) { + int32_t code = convertScalarParamToDataBlock(input, numOfCols, &inputBlock); + if (code != 0) { fnError("doCallUdfScalarFunc, convertScalarParamToDataBlock failed. code: %d", code); return code; } @@ -2174,13 +2235,13 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t err = convertDataBlockToScalarParm(&resultBlock, output); taosArrayDestroy(resultBlock.pDataBlock); } - + blockDataFreeRes(&inputBlock); return err; } int32_t doTeardownUdf(UdfcFuncHandle handle) { - int32_t code = TSDB_CODE_SUCCESS, lino = 0;; + int32_t code = TSDB_CODE_SUCCESS, lino = 0; SUdfcUvSession *session = (SUdfcUvSession *)handle; if (session->udfUvPipe == NULL) { @@ -2190,7 +2251,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); - if(task == NULL) { + if (task == NULL) { fnError("doTeardownUdf, failed to allocate memory for task"); taosMemoryFree(session); return terrno;