fix(rpc): avoid fd leak
This commit is contained in:
parent
17fe1d44da
commit
7f7b1a423a
|
@ -21,15 +21,16 @@ typedef struct SCliConn {
|
||||||
uv_connect_t connReq;
|
uv_connect_t connReq;
|
||||||
uv_stream_t* stream;
|
uv_stream_t* stream;
|
||||||
uv_write_t writeReq;
|
uv_write_t writeReq;
|
||||||
void* hostThrd;
|
|
||||||
SConnBuffer readBuf;
|
|
||||||
void* data;
|
|
||||||
STransQueue cliMsgs;
|
|
||||||
queue conn;
|
|
||||||
uint64_t expireTime;
|
|
||||||
int hThrdIdx;
|
|
||||||
STransCtx ctx;
|
|
||||||
|
|
||||||
|
void* hostThrd;
|
||||||
|
int hThrdIdx;
|
||||||
|
|
||||||
|
SConnBuffer readBuf;
|
||||||
|
STransQueue cliMsgs;
|
||||||
|
queue conn;
|
||||||
|
uint64_t expireTime;
|
||||||
|
|
||||||
|
STransCtx ctx;
|
||||||
bool broken; // link broken or not
|
bool broken; // link broken or not
|
||||||
ConnStatus status; //
|
ConnStatus status; //
|
||||||
|
|
||||||
|
@ -157,13 +158,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
|
||||||
transClearBuffer(&conn->readBuf); \
|
transClearBuffer(&conn->readBuf); \
|
||||||
transFreeMsg(transContFromHead((char*)head)); \
|
transFreeMsg(transContFromHead((char*)head)); \
|
||||||
tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \
|
tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \
|
||||||
while (T_REF_VAL_GET(conn) > 1) { \
|
if (T_REF_VAL_GET(conn) > 1) { \
|
||||||
transUnrefCliHandle(conn); \
|
|
||||||
} \
|
|
||||||
if (T_REF_VAL_GET(conn) == 1) { \
|
|
||||||
transUnrefCliHandle(conn); \
|
transUnrefCliHandle(conn); \
|
||||||
} \
|
} \
|
||||||
destroyCmsg(pMsg); \
|
destroyCmsg(pMsg); \
|
||||||
|
addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \
|
||||||
return; \
|
return; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
|
@ -35,7 +35,6 @@ typedef struct SSrvConn {
|
||||||
uv_timer_t pTimer;
|
uv_timer_t pTimer;
|
||||||
|
|
||||||
queue queue;
|
queue queue;
|
||||||
int persist; // persist connection or not
|
|
||||||
SConnBuffer readBuf; // read buf,
|
SConnBuffer readBuf; // read buf,
|
||||||
int inType;
|
int inType;
|
||||||
void* pTransInst; // rpc init
|
void* pTransInst; // rpc init
|
||||||
|
@ -138,6 +137,7 @@ static void destroySmsg(SSrvMsg* smsg);
|
||||||
// check whether already read complete packet
|
// check whether already read complete packet
|
||||||
static SSrvConn* createConn(void* hThrd);
|
static SSrvConn* createConn(void* hThrd);
|
||||||
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
|
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
|
||||||
|
static int reallocConnRefHandle(SSrvConn* conn);
|
||||||
|
|
||||||
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
|
@ -164,7 +164,7 @@ static void* transWorkerThread(void* arg);
|
||||||
static void* transAcceptThread(void* arg);
|
static void* transAcceptThread(void* arg);
|
||||||
|
|
||||||
// add handle loop
|
// add handle loop
|
||||||
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName);
|
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName);
|
||||||
static bool addHandleToAcceptloop(void* arg);
|
static bool addHandleToAcceptloop(void* arg);
|
||||||
|
|
||||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||||
|
@ -517,7 +517,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
int64_t refId = transMsg.refId;
|
int64_t refId = transMsg.refId;
|
||||||
SExHandle* exh2 = uvAcquireExHandle(refId);
|
SExHandle* exh2 = uvAcquireExHandle(refId);
|
||||||
if (exh2 == NULL || exh1 != exh2) {
|
if (exh2 == NULL || exh1 != exh2) {
|
||||||
tTrace("server handle %p except msg, ignore it", exh1);
|
tTrace("server handle except msg %p, ignore it", exh1);
|
||||||
uvReleaseExHandle(refId);
|
uvReleaseExHandle(refId);
|
||||||
destroySmsg(msg);
|
destroySmsg(msg);
|
||||||
continue;
|
continue;
|
||||||
|
@ -581,11 +581,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
||||||
|
|
||||||
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
||||||
if (pObj->numOfWorkerReady < pObj->numOfThreads) {
|
if (pObj->numOfWorkerReady < pObj->numOfThreads) {
|
||||||
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady);
|
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
|
||||||
|
pObj->numOfWorkerReady);
|
||||||
uv_close((uv_handle_t*)cli, NULL);
|
uv_close((uv_handle_t*)cli, NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
||||||
wr->data = cli;
|
wr->data = cli;
|
||||||
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
||||||
|
@ -681,14 +682,14 @@ void* transAcceptThread(void* arg) {
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
void uvOnPipeConnectionCb(uv_connect_t *connect, int status) {
|
void uvOnPipeConnectionCb(uv_connect_t* connect, int status) {
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
|
SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req);
|
||||||
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
|
||||||
}
|
}
|
||||||
static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) {
|
static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
|
||||||
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
|
pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t));
|
||||||
if (0 != uv_loop_init(pThrd->loop)) {
|
if (0 != uv_loop_init(pThrd->loop)) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -787,6 +788,19 @@ static void destroyConn(SSrvConn* conn, bool clear) {
|
||||||
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
|
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static int reallocConnRefHandle(SSrvConn* conn) {
|
||||||
|
uvReleaseExHandle(conn->refId);
|
||||||
|
uvRemoveExHandle(conn->refId);
|
||||||
|
// avoid app continue to send msg on invalid handle
|
||||||
|
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
|
||||||
|
exh->handle = conn;
|
||||||
|
exh->pThrd = conn->hostThrd;
|
||||||
|
exh->refId = uvAddExHandle(exh);
|
||||||
|
uvAcquireExHandle(exh->refId);
|
||||||
|
conn->refId = exh->refId;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
static void uvDestroyConn(uv_handle_t* handle) {
|
static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
SSrvConn* conn = handle->data;
|
SSrvConn* conn = handle->data;
|
||||||
if (conn == NULL) {
|
if (conn == NULL) {
|
||||||
|
@ -822,7 +836,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) {
|
||||||
ASSERT(status == 0);
|
ASSERT(status == 0);
|
||||||
|
|
||||||
SServerObj* srv = container_of(handle, SServerObj, pipeListen);
|
SServerObj* srv = container_of(handle, SServerObj, pipeListen);
|
||||||
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
|
uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]);
|
||||||
ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
|
ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1));
|
||||||
ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));
|
ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe));
|
||||||
|
|
||||||
|
@ -859,7 +873,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
|
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId());
|
||||||
#else
|
#else
|
||||||
char pipeName[PATH_MAX] = {0};
|
char pipeName[PATH_MAX] = {0};
|
||||||
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId());
|
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(),
|
||||||
|
taosGetSelfPthreadId());
|
||||||
#endif
|
#endif
|
||||||
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
|
assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName));
|
||||||
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
|
assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb));
|
||||||
|
@ -874,7 +889,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
|
||||||
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
|
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
|
||||||
thrd->pipe = &(srv->pipe[i][1]); // init read
|
thrd->pipe = &(srv->pipe[i][1]); // init read
|
||||||
|
|
||||||
if (false == addHandleToWorkloop(thrd,pipeName)) {
|
if (false == addHandleToWorkloop(thrd, pipeName)) {
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
|
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
|
||||||
|
@ -958,6 +973,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||||
}
|
}
|
||||||
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||||
SSrvConn* conn = msg->pConn;
|
SSrvConn* conn = msg->pConn;
|
||||||
|
reallocConnRefHandle(conn);
|
||||||
if (conn->status == ConnAcquire) {
|
if (conn->status == ConnAcquire) {
|
||||||
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
if (!transQueuePush(&conn->srvMsgs, msg)) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -35,7 +35,7 @@ class TDSimClient:
|
||||||
"tableIncStepPerVnode": "10000",
|
"tableIncStepPerVnode": "10000",
|
||||||
"maxVgroupsPerDb": "1000",
|
"maxVgroupsPerDb": "1000",
|
||||||
"sdbDebugFlag": "143",
|
"sdbDebugFlag": "143",
|
||||||
"rpcDebugFlag": "135",
|
"rpcDebugFlag": "143",
|
||||||
"tmrDebugFlag": "131",
|
"tmrDebugFlag": "131",
|
||||||
"cDebugFlag": "135",
|
"cDebugFlag": "135",
|
||||||
"udebugFlag": "135",
|
"udebugFlag": "135",
|
||||||
|
@ -136,7 +136,7 @@ class TDDnode:
|
||||||
"tsdbDebugFlag": "135",
|
"tsdbDebugFlag": "135",
|
||||||
"mDebugFlag": "135",
|
"mDebugFlag": "135",
|
||||||
"sdbDebugFlag": "135",
|
"sdbDebugFlag": "135",
|
||||||
"rpcDebugFlag": "135",
|
"rpcDebugFlag": "143",
|
||||||
"tmrDebugFlag": "131",
|
"tmrDebugFlag": "131",
|
||||||
"cDebugFlag": "135",
|
"cDebugFlag": "135",
|
||||||
"httpDebugFlag": "135",
|
"httpDebugFlag": "135",
|
||||||
|
|
Loading…
Reference in New Issue