opt transport

This commit is contained in:
yihaoDeng 2024-09-09 22:06:45 +08:00
parent e656f65d7c
commit 086333df51
3 changed files with 12 additions and 21 deletions

View File

@ -351,9 +351,10 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType);
// request list // request list
typedef struct STransReq { typedef struct STransReq {
queue q; queue q; // gloabl queue node
queue node; queue node; // req queue node
void* conn; void* conn;
uv_write_t req;
} STransReq; } STransReq;
void transReqQueueInit(queue* q); void transReqQueueInit(queue* q);

View File

@ -3633,7 +3633,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) {
return NULL; return NULL;
} }
code = transHeapGet(pHeap, &pConn); code = transHeapGet(pHeap, &pConn);
// if (pConn && taosHashGetSize(pConn->pQTable) > 0) { // if (pConn && taosHashGetSifze(pConn->pQTable) > 0) {
// tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap, // tDebug("get conn %p from heap cache for key:%s, status:%d, refCnt:%d", pConn, key, pConn->inHeap,
// pConn->reqRefCnt); return NULL; // pConn->reqRefCnt); return NULL;
// } /*else { // } /*else {

View File

@ -625,26 +625,15 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
void uvOnSendCb(uv_write_t* req, int status) { void uvOnSendCb(uv_write_t* req, int status) {
STUB_RAND_NETWORK_ERR(status); STUB_RAND_NETWORK_ERR(status);
queue q; STransReq* userReq = req->data;
QUEUE_INIT(&q);
STransReq* userReq = transReqQueueRemove(req);
SSvrConn* conn = userReq->conn; SSvrConn* conn = userReq->conn;
queue* src = &userReq->node;
queue* src = &userReq->node;
while (!QUEUE_IS_EMPTY(src)) {
queue* head = QUEUE_HEAD(src);
QUEUE_REMOVE(head);
QUEUE_PUSH(&q, head);
// }
}
// QUEUE_MOVE(src, &q);
tDebug("%s conn %p send data", transLabel(conn->pInst), conn); tDebug("%s conn %p send data", transLabel(conn->pInst), conn);
if (status == 0) { if (status == 0) {
while (!QUEUE_IS_EMPTY(&q)) { while (!QUEUE_IS_EMPTY(src)) {
queue* head = QUEUE_HEAD(&q); queue* head = QUEUE_HEAD(&src);
QUEUE_REMOVE(head); QUEUE_REMOVE(head);
SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq); SSvrMsg* smsg = QUEUE_DATA(head, SSvrMsg, sendReq);
@ -660,6 +649,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
conn->broken = true; conn->broken = true;
} }
} }
taosMemoryFree(userReq);
transUnrefSrvHandle(conn); transUnrefSrvHandle(conn);
} }
static void uvOnPipeWriteCb(uv_write_t* req, int status) { static void uvOnPipeWriteCb(uv_write_t* req, int status) {
@ -793,10 +783,10 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) {
STransReq* pWreq = taosMemoryCalloc(1, sizeof(STransReq)); STransReq* pWreq = taosMemoryCalloc(1, sizeof(STransReq));
pWreq->conn = pConn; pWreq->conn = pConn;
QUEUE_INIT(&pWreq->q); QUEUE_INIT(&pWreq->q);
QUEUE_MOVE(&sendReqNode, &pWreq->node); QUEUE_MOVE(&sendReqNode, &pWreq->node);
pWreq->req.data = pWreq;
uv_write_t* req = transReqQueuePush(&pConn->wreqQueue, pWreq); uv_write_t* req = &pWreq->req;
if (req == NULL) { if (req == NULL) {
if (!uv_is_closing((uv_handle_t*)(pConn->pTcp))) { if (!uv_is_closing((uv_handle_t*)(pConn->pTcp))) {
tError("conn %p failed to write data, reason:%s", pConn, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); tError("conn %p failed to write data, reason:%s", pConn, tstrerror(TSDB_CODE_OUT_OF_MEMORY));