diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index bae4aac2c4..451109cd59 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -350,15 +350,15 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key); void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); // request list -typedef struct STransReq { +typedef struct SWriteReq { queue q; // gloabl queue node queue node; // req queue node void* conn; uv_write_t req; -} STransReq; +} SWriteReq; void transReqQueueInit(queue* q); -void* transReqQueuePush(queue* q, STransReq* req); +void* transReqQueuePush(queue* q, SWriteReq* req); void* transReqQueueRemove(void* arg); void transReqQueueClear(queue* q); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 326fd434ec..add463bb45 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -411,7 +411,7 @@ void transReqQueueInit(queue* q) { // init req queue QUEUE_INIT(q); } -void* transReqQueuePush(queue* q, STransReq* userReq) { +void* transReqQueuePush(queue* q, SWriteReq* userReq) { uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); req->data = userReq; @@ -422,7 +422,7 @@ void* transReqQueueRemove(void* arg) { void* ret = NULL; uv_write_t* req = arg; - STransReq* userReq = req ? req->data : NULL; + SWriteReq* userReq = req ? req->data : NULL; if (req == NULL) return NULL; QUEUE_REMOVE(&userReq->q); @@ -432,7 +432,7 @@ void transReqQueueClear(queue* q) { while (!QUEUE_IS_EMPTY(q)) { queue* h = QUEUE_HEAD(q); QUEUE_REMOVE(h); - STransReq* req = QUEUE_DATA(h, STransReq, q); + SWriteReq* req = QUEUE_DATA(h, SWriteReq, q); taosMemoryFree(req); } } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 22db1d26b3..a882ea7e68 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -25,7 +25,7 @@ typedef struct { } SSvrRegArg; typedef struct SSvrConn { - T_REF_DECLARE() + int32_t ref; uv_tcp_t* pTcp; queue wreqQueue; uv_timer_t pTimer; @@ -625,7 +625,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnSendCb(uv_write_t* req, int status) { STUB_RAND_NETWORK_ERR(status); - STransReq* userReq = req->data; + SWriteReq* userReq = req->data; SSvrConn* conn = userReq->conn; queue* src = &userReq->node; @@ -780,7 +780,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { transRefSrvHandle(pConn); - STransReq* pWreq = taosMemoryCalloc(1, sizeof(STransReq)); + SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq)); pWreq->conn = pConn; QUEUE_INIT(&pWreq->q); QUEUE_MOVE(&sendReqNode, &pWreq->node); @@ -843,7 +843,7 @@ static void destroyAllConn(SWorkThrd* pThrd) { QUEUE_INIT(h); SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); - while (T_REF_VAL_GET(c) >= 2) { + while (c->ref >= 2) { transUnrefSrvHandle(c); } transUnrefSrvHandle(c); @@ -1743,17 +1743,19 @@ void transRefSrvHandle(void* handle) { if (handle == NULL) { return; } - int ref = T_REF_INC((SSvrConn*)handle); - tTrace("conn %p ref count:%d", handle, ref); + SSvrConn* pConn = handle; + pConn->ref++; + tTrace("conn %p ref count:%d", handle, pConn->ref); } void transUnrefSrvHandle(void* handle) { if (handle == NULL) { return; } - int ref = T_REF_DEC((SSvrConn*)handle); - tTrace("conn %p ref count:%d", handle, ref); - if (ref == 0) { + SSvrConn* pConn = handle; + pConn->ref--; + tTrace("conn %p ref count:%d", handle, pConn->ref); + if (pConn->ref == 0) { destroyConn((SSvrConn*)handle, true); } }