diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 3a388d1c07..33614fadcb 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1072,6 +1072,8 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask); + int32_t code = 0; + switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); @@ -1091,6 +1093,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); connReq->data = uvTask; uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect); + code = 0; break; } case UV_TASK_REQ_RSP: { @@ -1101,12 +1104,14 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { if (err != 0) { fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err)); } + code = err; break; } case UV_TASK_DISCONNECT: { SClientUvConn *conn = uvTask->pipe->data; QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose); + code = 0; break; } default: { @@ -1115,7 +1120,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { } } - return 0; + return code; } void udfClientAsyncCb(uv_async_t *async) { @@ -1133,6 +1138,9 @@ void udfClientAsyncCb(uv_async_t *async) { int32_t code = udfcStartUvTask(task); if (code == 0) { QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue); + } else { + task->errCode = code; + uv_sem_post(&task->taskSem); } }