diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index b931118f04..376c764f2c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -497,7 +497,8 @@ enum { REQ_STATUS_INIT = 0, REQ_STATUS_PROCESSING }; #define BUFFER_LIMIT 4 typedef struct { - queue q; + queue node; // queue for write + queue q; // queue for reqs uv_write_t wreq; void* arg; } SWReqsWrapper; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 7814ccafac..8f3a112058 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -15,7 +15,7 @@ #include "transComm.h" -#define BUFFER_CAP 16 * 4096 +#define BUFFER_CAP 8 * 1024 static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; @@ -893,6 +893,7 @@ int32_t initWQ(queue* wq) { SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper)); w->wreq.data = w; w->arg = NULL; + QUEUE_INIT(&w->node); QUEUE_PUSH(wq, &w->q); } return 0; @@ -912,13 +913,19 @@ uv_write_t* allocWReqFromWQ(queue* wq, void* arg) { QUEUE_REMOVE(node); SWReqsWrapper* w = QUEUE_DATA(node, SWReqsWrapper, q); w->arg = arg; + QUEUE_INIT(&w->node); + return &w->wreq; } else { SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper)); w->wreq.data = w; w->arg = arg; + QUEUE_INIT(&w->node); return &w->wreq; } } -void freeWReqToWQ(queue* wq, SWReqsWrapper* w) { QUEUE_PUSH(wq, &w->q); } \ No newline at end of file +void freeWReqToWQ(queue* wq, SWReqsWrapper* w) { + QUEUE_INIT(&w->node); + QUEUE_PUSH(wq, &w->q); +} \ No newline at end of file diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index c5fc6067e8..20e3df98ec 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -648,16 +648,17 @@ void uvOnSendCb(uv_write_t* req, int status) { STUB_RAND_NETWORK_ERR(status); SWReqsWrapper* wrapper = req->data; + SSvrConn* conn = wrapper->arg; - SWriteReq* userReq = wrapper->arg; - SSvrConn* conn = userReq->conn; - queue* src = &userReq->node; + queue src; + QUEUE_INIT(&src); + QUEUE_MOVE(&wrapper->node, &src); freeWReqToWQ(&conn->wq, wrapper); tDebug("%s conn %p send data out ", transLabel(conn->pInst), conn); if (status == 0) { - while (!QUEUE_IS_EMPTY(src)) { + while (!QUEUE_IS_EMPTY(&src)) { queue* head = QUEUE_HEAD(&src); QUEUE_REMOVE(head); @@ -668,7 +669,7 @@ void uvOnSendCb(uv_write_t* req, int status) { destroySmsg(smsg); } } else { - while (!QUEUE_IS_EMPTY(src)) { + while (!QUEUE_IS_EMPTY(&src)) { queue* head = QUEUE_HEAD(&src); QUEUE_REMOVE(head); @@ -682,7 +683,6 @@ void uvOnSendCb(uv_write_t* req, int status) { conn->broken = true; transUnrefSrvHandle(conn); } - taosMemoryFree(userReq); transUnrefSrvHandle(conn); } static void uvOnPipeWriteCb(uv_write_t* req, int status) { @@ -800,11 +800,8 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) { return; } - SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq)); - pWreq->conn = pConn; - QUEUE_INIT(&pWreq->node); - - uv_write_t* req = allocWReqFromWQ(&pConn->wq, pWreq); + uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn); + SWReqsWrapper* pWreq = req->data; uv_buf_t* pBuf = NULL; int32_t bufNum = 0;