opt parameter

This commit is contained in:
yihaoDeng 2024-09-14 18:37:37 +08:00
parent 7acf61f02b
commit 860ca7963f
3 changed files with 19 additions and 14 deletions

View File

@ -497,7 +497,8 @@ enum { REQ_STATUS_INIT = 0, REQ_STATUS_PROCESSING };
#define BUFFER_LIMIT 4 #define BUFFER_LIMIT 4
typedef struct { typedef struct {
queue q; queue node; // queue for write
queue q; // queue for reqs
uv_write_t wreq; uv_write_t wreq;
void* arg; void* arg;
} SWReqsWrapper; } SWReqsWrapper;

View File

@ -15,7 +15,7 @@
#include "transComm.h" #include "transComm.h"
#define BUFFER_CAP 16 * 4096 #define BUFFER_CAP 8 * 1024
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
@ -893,6 +893,7 @@ int32_t initWQ(queue* wq) {
SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper)); SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
w->wreq.data = w; w->wreq.data = w;
w->arg = NULL; w->arg = NULL;
QUEUE_INIT(&w->node);
QUEUE_PUSH(wq, &w->q); QUEUE_PUSH(wq, &w->q);
} }
return 0; return 0;
@ -912,13 +913,19 @@ uv_write_t* allocWReqFromWQ(queue* wq, void* arg) {
QUEUE_REMOVE(node); QUEUE_REMOVE(node);
SWReqsWrapper* w = QUEUE_DATA(node, SWReqsWrapper, q); SWReqsWrapper* w = QUEUE_DATA(node, SWReqsWrapper, q);
w->arg = arg; w->arg = arg;
QUEUE_INIT(&w->node);
return &w->wreq; return &w->wreq;
} else { } else {
SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper)); SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper));
w->wreq.data = w; w->wreq.data = w;
w->arg = arg; w->arg = arg;
QUEUE_INIT(&w->node);
return &w->wreq; return &w->wreq;
} }
} }
void freeWReqToWQ(queue* wq, SWReqsWrapper* w) { QUEUE_PUSH(wq, &w->q); } void freeWReqToWQ(queue* wq, SWReqsWrapper* w) {
QUEUE_INIT(&w->node);
QUEUE_PUSH(wq, &w->q);
}

View File

@ -648,16 +648,17 @@ void uvOnSendCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status); STUB_RAND_NETWORK_ERR(status);
SWReqsWrapper* wrapper = req->data; SWReqsWrapper* wrapper = req->data;
SSvrConn* conn = wrapper->arg;
SWriteReq* userReq = wrapper->arg; queue src;
SSvrConn* conn = userReq->conn; QUEUE_INIT(&src);
queue* src = &userReq->node; QUEUE_MOVE(&wrapper->node, &src);
freeWReqToWQ(&conn->wq, wrapper); freeWReqToWQ(&conn->wq, wrapper);
tDebug("%s conn %p send data out ", transLabel(conn->pInst), conn); tDebug("%s conn %p send data out ", transLabel(conn->pInst), conn);
if (status == 0) { if (status == 0) {
while (!QUEUE_IS_EMPTY(src)) { while (!QUEUE_IS_EMPTY(&src)) {
queue* head = QUEUE_HEAD(&src); queue* head = QUEUE_HEAD(&src);
QUEUE_REMOVE(head); QUEUE_REMOVE(head);
@ -668,7 +669,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
destroySmsg(smsg); destroySmsg(smsg);
} }
} else { } else {
while (!QUEUE_IS_EMPTY(src)) { while (!QUEUE_IS_EMPTY(&src)) {
queue* head = QUEUE_HEAD(&src); queue* head = QUEUE_HEAD(&src);
QUEUE_REMOVE(head); QUEUE_REMOVE(head);
@ -682,7 +683,6 @@ void uvOnSendCb(uv_write_t* req, int status) {
conn->broken = true; conn->broken = true;
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
} }
taosMemoryFree(userReq);
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
} }
static void uvOnPipeWriteCb(uv_write_t* req, int status) { static void uvOnPipeWriteCb(uv_write_t* req, int status) {
@ -800,11 +800,8 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) {
return; return;
} }
SWriteReq* pWreq = taosMemoryCalloc(1, sizeof(SWriteReq)); uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn);
pWreq->conn = pConn; SWReqsWrapper* pWreq = req->data;
QUEUE_INIT(&pWreq->node);
uv_write_t* req = allocWReqFromWQ(&pConn->wq, pWreq);
uv_buf_t* pBuf = NULL; uv_buf_t* pBuf = NULL;
int32_t bufNum = 0; int32_t bufNum = 0;