opt transport

This commit is contained in:
yihaoDeng 2024-09-09 22:16:47 +08:00
parent 086333df51
commit debf225c15
3 changed files with 17 additions and 15 deletions

View File

@ -350,15 +350,15 @@ void* transCtxDumpVal(STransCtx* ctx, int32_t key);
void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType); void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// request list // request list
typedef struct STransReq { typedef struct SWriteReq {
queue q; // gloabl queue node queue q; // gloabl queue node
queue node; // req queue node queue node; // req queue node
void* conn; void* conn;
uv_write_t req; uv_write_t req;
} STransReq; } SWriteReq;
void transReqQueueInit(queue* q); void transReqQueueInit(queue* q);
void* transReqQueuePush(queue* q, STransReq* req); void* transReqQueuePush(queue* q, SWriteReq* req);
void* transReqQueueRemove(void* arg); void* transReqQueueRemove(void* arg);
void transReqQueueClear(queue* q); void transReqQueueClear(queue* q);

View File

@ -411,7 +411,7 @@ void transReqQueueInit(queue* q) {
// init req queue // init req queue
QUEUE_INIT(q); QUEUE_INIT(q);
} }
void* transReqQueuePush(queue* q, STransReq* userReq) { void* transReqQueuePush(queue* q, SWriteReq* userReq) {
uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
req->data = userReq; req->data = userReq;
@ -422,7 +422,7 @@ void* transReqQueueRemove(void* arg) {
void* ret = NULL; void* ret = NULL;
uv_write_t* req = arg; uv_write_t* req = arg;
STransReq* userReq = req ? req->data : NULL; SWriteReq* userReq = req ? req->data : NULL;
if (req == NULL) return NULL; if (req == NULL) return NULL;
QUEUE_REMOVE(&userReq->q); QUEUE_REMOVE(&userReq->q);
@ -432,7 +432,7 @@ void transReqQueueClear(queue* q) {
while (!QUEUE_IS_EMPTY(q)) { while (!QUEUE_IS_EMPTY(q)) {
queue* h = QUEUE_HEAD(q); queue* h = QUEUE_HEAD(q);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
STransReq* req = QUEUE_DATA(h, STransReq, q); SWriteReq* req = QUEUE_DATA(h, SWriteReq, q);
taosMemoryFree(req); taosMemoryFree(req);
} }
} }

View File

@ -25,7 +25,7 @@ typedef struct {
} SSvrRegArg; } SSvrRegArg;
typedef struct SSvrConn { typedef struct SSvrConn {
T_REF_DECLARE() int32_t ref;
uv_tcp_t* pTcp; uv_tcp_t* pTcp;
queue wreqQueue; queue wreqQueue;
uv_timer_t pTimer; uv_timer_t pTimer;
@ -625,7 +625,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnSendCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status); STUB_RAND_NETWORK_ERR(status);
STransReq* userReq = req->data; SWriteReq* userReq = req->data;
SSvrConn* conn = userReq->conn; SSvrConn* conn = userReq->conn;
queue* src = &userReq->node; queue* src = &userReq->node;
@ -780,7 +780,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
STransReq* pWreq = taosMemoryCalloc(1, sizeof(STransReq)); SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq));
pWreq->conn = pConn; pWreq->conn = pConn;
QUEUE_INIT(&pWreq->q); QUEUE_INIT(&pWreq->q);
QUEUE_MOVE(&sendReqNode, &pWreq->node); QUEUE_MOVE(&sendReqNode, &pWreq->node);
@ -843,7 +843,7 @@ static void destroyAllConn(SWorkThrd* pThrd) {
QUEUE_INIT(h); QUEUE_INIT(h);
SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue);
while (T_REF_VAL_GET(c) >= 2) { while (c->ref >= 2) {
transUnrefSrvHandle(c); transUnrefSrvHandle(c);
} }
transUnrefSrvHandle(c); transUnrefSrvHandle(c);
@ -1743,17 +1743,19 @@ void transRefSrvHandle(void* handle) {
if (handle == NULL) { if (handle == NULL) {
return; return;
} }
int ref = T_REF_INC((SSvrConn*)handle); SSvrConn* pConn = handle;
tTrace("conn %p ref count:%d", handle, ref); pConn->ref++;
tTrace("conn %p ref count:%d", handle, pConn->ref);
} }
void transUnrefSrvHandle(void* handle) { void transUnrefSrvHandle(void* handle) {
if (handle == NULL) { if (handle == NULL) {
return; return;
} }
int ref = T_REF_DEC((SSvrConn*)handle); SSvrConn* pConn = handle;
tTrace("conn %p ref count:%d", handle, ref); pConn->ref--;
if (ref == 0) { tTrace("conn %p ref count:%d", handle, pConn->ref);
if (pConn->ref == 0) {
destroyConn((SSvrConn*)handle, true); destroyConn((SSvrConn*)handle, true);
} }
} }