From 086333df519320314a27e42ff9718858cfce5c22 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 9 Sep 2024 22:06:45 +0800 Subject: [PATCH] opt transport --- source/libs/transport/inc/transComm.h | 7 ++++--- source/libs/transport/src/transCli.c | 2 +- source/libs/transport/src/transSvr.c | 24 +++++++----------------- 3 files changed, 12 insertions(+), 21 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 4c5b1511b2..bae4aac2c4 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -351,9 +351,10 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); // request list typedef struct STransReq { - queue q; - queue node; - void* conn; + queue q; // gloabl queue node + queue node; // req queue node + void* conn; + uv_write_t req; } STransReq; void transReqQueueInit(queue* q); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 25ac74d9c0..8d811bf755 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -3633,7 +3633,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { return NULL; } code = transHeapGet(pHeap, &pConn); - // if (pConn && taosHashGetSize(pConn->pQTable) > 0) { + // if (pConn && taosHashGetSifze(pConn->pQTable) > 0) { // tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, // pConn->reqRefCnt); return NULL; // } /*else { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 53e9c0f9b1..22db1d26b3 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -625,26 +625,15 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnSendCb(uv_write_t* req, int status) { STUB_RAND_NETWORK_ERR(status); - queue q; - QUEUE_INIT(&q); - - STransReq* userReq = transReqQueueRemove(req); + STransReq* userReq = req->data; SSvrConn* conn = userReq->conn; - - queue* src = &userReq->node; - while (!QUEUE_IS_EMPTY(src)) { - queue* head = QUEUE_HEAD(src); - QUEUE_REMOVE(head); - QUEUE_PUSH(&q, head); - // } - } - // QUEUE_MOVE(src, &q); + queue* src = &userReq->node; tDebug("%s conn %p send data", transLabel(conn->pInst), conn); if (status == 0) { - while (!QUEUE_IS_EMPTY(&q)) { - queue* head = QUEUE_HEAD(&q); + while (!QUEUE_IS_EMPTY(src)) { + queue* head = QUEUE_HEAD(&src); QUEUE_REMOVE(head); SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); @@ -660,6 +649,7 @@ void uvOnSendCb(uv_write_t* req, int status) { conn->broken = true; } } + taosMemoryFree(userReq); transUnrefSrvHandle(conn); } static void uvOnPipeWriteCb(uv_write_t* req, int status) { @@ -793,10 +783,10 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { STransReq* pWreq = taosMemoryCalloc(1, sizeof(STransReq)); pWreq->conn = pConn; QUEUE_INIT(&pWreq->q); - QUEUE_MOVE(&sendReqNode, &pWreq->node); + pWreq->req.data = pWreq; - uv_write_t* req = transReqQueuePush(&pConn->wreqQueue, pWreq); + uv_write_t* req = &pWreq->req; if (req == NULL) { if (!uv_is_closing((uv_handle_t*)(pConn->pTcp))) { tError("conn %p failed to write data, reason:%s", pConn, tstrerror(TSDB_CODE_OUT_OF_MEMORY));