refactor rpc

This commit is contained in:
yihaoDeng 2022-01-25 21:19:35 +08:00
parent 3b5f921575
commit 5136d644b0
1 changed files with 17 additions and 10 deletions

View File

@ -31,6 +31,7 @@ typedef struct SCliConn {
char secured; char secured;
uint64_t expireTime; uint64_t expireTime;
int8_t notifyCount; // timers already notify to client int8_t notifyCount; // timers already notify to client
int32_t ref;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
@ -126,6 +127,7 @@ static void clientHandleResp(SCliConn* conn) {
// buf alread translated to rpcMsg.pCont // buf alread translated to rpcMsg.pCont
transClearBuffer(&conn->readBuf); transClearBuffer(&conn->readBuf);
uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb);
addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn);
// start thread's timer of conn pool if not active // start thread's timer of conn pool if not active
@ -137,6 +139,7 @@ static void clientHandleResp(SCliConn* conn) {
static void clientHandleExcept(SCliConn* pConn) { static void clientHandleExcept(SCliConn* pConn) {
if (pConn->data == NULL) { if (pConn->data == NULL) {
clientConnDestroy(pConn, true); clientConnDestroy(pConn, true);
return; return;
} }
tDebug("conn %p destroy", pConn); tDebug("conn %p destroy", pConn);
@ -258,7 +261,7 @@ static bool clientReadComplete(SConnBuffer* data) {
return false; return false;
} }
} }
static void clientAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf); transAllocBuffer(pBuf, buf);
@ -270,6 +273,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (clientReadComplete(pBuf)) { if (clientReadComplete(pBuf)) {
uv_read_stop((uv_stream_t*)conn->stream);
tDebug("conn %p read complete", conn); tDebug("conn %p read complete", conn);
clientHandleResp(conn); clientHandleResp(conn);
} else { } else {
@ -291,6 +295,8 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
static void clientConnDestroy(SCliConn* conn, bool clear) { static void clientConnDestroy(SCliConn* conn, bool clear) {
// //
conn->ref--;
if (conn->ref == 0) {
tDebug("conn %p remove from conn pool", conn); tDebug("conn %p remove from conn pool", conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
tDebug("conn %p remove from conn pool successfully", conn); tDebug("conn %p remove from conn pool successfully", conn);
@ -298,9 +304,10 @@ static void clientConnDestroy(SCliConn* conn, bool clear) {
uv_close((uv_handle_t*)conn->stream, clientDestroy); uv_close((uv_handle_t*)conn->stream, clientDestroy);
} }
} }
}
static void clientDestroy(uv_handle_t* handle) { static void clientDestroy(uv_handle_t* handle) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
transDestroyBuffer(&conn->readBuf); // transDestroyBuffer(&conn->readBuf);
free(conn->stream); free(conn->stream);
free(conn->writeReq); free(conn->writeReq);
@ -325,7 +332,7 @@ static void clientWriteCb(uv_write_t* req, int status) {
// uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); // uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
// pConn->stream->data = pConn; // pConn->stream->data = pConn;
//} //}
uv_read_start((uv_stream_t*)pConn->stream, clientAllocReadBufferCb, clientReadCb); uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb);
// impl later // impl later
} }
@ -371,11 +378,11 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tDebug("conn %p get from conn pool", conn); tDebug("conn %p get from conn pool", conn);
conn->data = pMsg; conn->data = pMsg;
conn->writeReq->data = conn; conn->writeReq->data = conn;
transDestroyBuffer(&conn->readBuf); // transDestroyBuffer(&conn->readBuf);
clientWrite(conn); clientWrite(conn);
} else { } else {
SCliConn* conn = calloc(1, sizeof(SCliConn)); SCliConn* conn = calloc(1, sizeof(SCliConn));
conn->ref++;
// read/write stream handle // read/write stream handle
conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));