From d5228ebd0741666c06850dcf0007a5f8d0232efa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 14 Feb 2022 15:37:13 +0800 Subject: [PATCH] refactor code --- source/libs/transport/inc/transComm.h | 2 +- source/libs/transport/src/transCli.c | 16 +++++--- source/libs/transport/src/transComm.c | 4 +- source/libs/transport/src/transSrv.c | 54 ++++++++++++++++++------- source/libs/transport/test/pushServer.c | 2 +- 5 files changed, 53 insertions(+), 25 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index bec0375dbe..6f8da57ee7 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -233,7 +233,7 @@ typedef struct { uv_async_t* asyncs; } SAsyncPool; -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb); +SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); void transDestroyAsyncPool(SAsyncPool* pool); int transSendAsync(SAsyncPool* pool, queue* mq); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2a4a1891ed..00d9174e76 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -38,6 +38,7 @@ typedef struct SCliConn { int32_t ref; // debug and log info struct sockaddr_in addr; + struct sockaddr_in locaddr; } SCliConn; typedef struct SCliMsg { @@ -130,8 +131,9 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), - ntohs(conn->addr.sin_port)); + tDebug("client conn %p %s received from %s:%d, local info: %s:%d", conn, TMSG_INFO(pHead->msgType), + inet_ntoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), inet_ntoa(conn->locaddr.sin_addr), + ntohs(conn->locaddr.sin_port)); if (conn->push != NULL && conn->notifyCount != 0) { (*conn->push->callback)(conn->push->arg, &rpcMsg); @@ -417,8 +419,9 @@ static void clientWrite(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port)); + tDebug("client conn %p %s is send to %s:%d, local info %s:%d", pConn, TMSG_INFO(pHead->msgType), + inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port)); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { @@ -433,6 +436,9 @@ static void clientConnCb(uv_connect_t* req, int status) { int addrlen = sizeof(pConn->addr); uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen); + addrlen = sizeof(pConn->locaddr); + uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->locaddr, &addrlen); + tTrace("client conn %p create", pConn); assert(pConn->stream == req->handle); @@ -579,7 +585,7 @@ static SCliThrdObj* createThrdObj() { pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, clientAsyncCb); + pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb); pThrd->timer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pThrd->timer); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 05b732b8cb..7aa5aa16f1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -250,9 +250,7 @@ int transDestroyBuffer(SConnBuffer* buf) { transClearBuffer(buf); } -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, void* arg, AsyncCB cb) { - static int sz = 10; - +SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = calloc(1, sizeof(SAsyncPool)); pool->index = 0; pool->nAsync = sz; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 5f4daef344..15561c184c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -31,7 +31,8 @@ typedef struct SSrvConn { void* pTransInst; // rpc init void* ahandle; // void* hostThrd; - void* pSrvMsg; + SArray* srvMsgs; + // void* pSrvMsg; struct sockaddr_in addr; @@ -94,6 +95,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); +static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSrvMsg* msg); @@ -310,14 +312,19 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnWriteCb(uv_write_t* req, int status) { SSrvConn* conn = req->data; - - SSrvMsg* smsg = conn->pSrvMsg; - destroySmsg(smsg); - conn->pSrvMsg = NULL; - transClearBuffer(&conn->readBuf); if (status == 0) { tTrace("server conn %p data already was written on stream", conn); + assert(taosArrayGetSize(conn->srvMsgs) >= 1); + SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, 0); + taosArrayRemove(conn->srvMsgs, 0); + destroySmsg(msg); + + // send second data, just use for push + if (taosArrayGetSize(conn->srvMsgs) > 0) { + msg = (SSrvMsg*)taosArrayGetP(conn->srvMsgs, 0); + uvStartSendRespInternal(msg); + } } else { tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); // @@ -361,20 +368,29 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { wb->base = msg; wb->len = len; } -static void uvStartSendResp(SSrvMsg* smsg) { - // impl + +static void uvStartSendRespInternal(SSrvMsg* smsg) { uv_buf_t wb; uvPrepareSendData(smsg, &wb); SSrvConn* pConn = smsg->pConn; uv_timer_stop(pConn->pTimer); - pConn->pSrvMsg = smsg; + // pConn->pSrvMsg = smsg; // conn->pWriter->data = smsg; uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); - - // SRpcMsg* rpcMsg = smsg->msg; - +} +static void uvStartSendResp(SSrvMsg* smsg) { + // impl + SSrvConn* pConn = smsg->pConn; + if (taosArrayGetSize(pConn->srvMsgs) > 0) { + tDebug("server conn %p push data to client %s:%d", pConn, inet_ntoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port)); + taosArrayPush(pConn->srvMsgs, &smsg); + return; + } + taosArrayPush(pConn->srvMsgs, &smsg); + uvStartSendRespInternal(smsg); return; } static void destroySmsg(SSrvMsg* smsg) { @@ -531,7 +547,7 @@ static bool addHandleToWorkloop(void* arg) { QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, pThrd, uvWorkerAsyncCb); + pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 4, pThrd, uvWorkerAsyncCb); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; } @@ -571,6 +587,7 @@ void* workerThread(void* arg) { static SSrvConn* createConn() { SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn)); + pConn->srvMsgs = taosArrayInit(2, sizeof(void*)); // tTrace("conn %p created", pConn); ++pConn->ref; return pConn; @@ -585,8 +602,15 @@ static void destroyConn(SSrvConn* conn, bool clear) { return; } transDestroyBuffer(&conn->readBuf); - destroySmsg(conn->pSrvMsg); - conn->pSrvMsg = NULL; + + for (int i = 0; i < taosArrayGetSize(conn->srvMsgs); i++) { + SSrvMsg* msg = taosArrayGetP(conn->srvMsgs, i); + destroySmsg(msg); + } + taosArrayDestroy(conn->srvMsgs); + + // destroySmsg(conn->pSrvMsg); + // conn->pSrvMsg = NULL; if (clear) { uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index f9115d3d4f..0bcc47383b 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -77,7 +77,7 @@ void processShellMsg() { taosFreeQitem(pRpcMsg); { - sleep(1); + // sleep(1); SRpcMsg nRpcMsg = {0}; nRpcMsg.pCont = rpcMallocCont(msgSize); nRpcMsg.contLen = msgSize;