From a3447c5dae2338a853ca84d4f214fa08abaaa1b0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 24 Oct 2022 14:59:47 +0800 Subject: [PATCH] avoid invalid read/write --- source/libs/transport/src/transSvr.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index cc381d06a7..1d6dbe64ed 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -374,7 +374,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } else { tError("fail to dispatch conn to work thread"); } - uv_close((uv_handle_t*)req->data, uvFreeCb); + if (!uv_is_closing((uv_handle_t*)req->data)) { + uv_close((uv_handle_t*)req->data, uvFreeCb); + } else { + taosMemoryFree(req->data); + } taosMemoryFree(req); } @@ -668,7 +672,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else { - uv_close((uv_handle_t*)cli, NULL); + if (!uv_is_closing((uv_handle_t*)cli)) { + uv_close((uv_handle_t*)cli, NULL); + } else { + taosMemoryFree(cli); + } } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { @@ -681,7 +689,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tWarn("failed to create connect:%p", q); taosMemoryFree(buf->base); uv_close((uv_handle_t*)q, NULL); - // taosMemoryFree(q); return; } // free memory allocated by @@ -972,6 +979,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); #endif + ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { tError("failed to bind pipe, errmsg: %s", uv_err_name(ret)); @@ -997,6 +1005,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, if (false == addHandleToWorkloop(thrd, pipeName)) { goto End; } + int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); if (err == 0) { tDebug("success to create worker-thread:%d", i); @@ -1006,14 +1015,17 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } + if (false == taosValidIpAndPort(srv->ip, srv->port)) { terrno = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); goto End; } + if (false == addHandleToAcceptloop(srv)) { goto End; } + int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); if (err == 0) { tDebug("success to create accept-thread"); @@ -1022,6 +1034,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; // clear all resource later } + srv->inited = true; return srv; End: