diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 6819068b64..b829251ccd 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -655,7 +655,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_tcp_init(pObj->loop, cli); if (uv_accept(stream, (uv_stream_t*)cli) == 0) { -#ifdef WINDOWS +#if defined(WINDOWS) || defined(DARWIN) if (pObj->numOfWorkerReady < pObj->numOfThreads) { tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); @@ -779,7 +779,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { return false; } -#ifdef WINDOWS +#if defined(WINDOWS) || defined(DARWIN) uv_pipe_init(pThrd->loop, pThrd->pipe, 1); #else uv_pipe_init(pThrd->loop, pThrd->pipe, 1); @@ -799,7 +799,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { QUEUE_INIT(&pThrd->conn); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); -#ifdef WINDOWS +#if defined(WINDOWS) || defined(DARWIN) uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); #else uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); @@ -976,17 +976,18 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_loop_init(srv->loop); char pipeName[PATH_MAX]; -#ifdef WINDOWS +#if defined(WINDOWS) || defined(DARWIN) int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); if (ret != 0) { tError("failed to init pipe, errmsg: %s", uv_err_name(ret)); goto End; } - +#if defined(WINDOWS) snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId()); - // char pipeName[PATH_MAX] = {0}; - // snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), - // taosGetSelfPthreadId()); +#elif defined(DARWIN) + snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), + taosGetSelfPthreadId()); +#endif ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { @@ -1036,7 +1037,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pThreadObj[i] = thrd; uv_os_sock_t fds[2]; - if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { goto End; } @@ -1059,8 +1060,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } - #endif + if (false == taosValidIpAndPort(srv->ip, srv->port)) { terrno = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());