From d4b0550a01fc787d8e92c84caafc2f4145ab3430 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 29 Oct 2024 20:04:37 +0800 Subject: [PATCH] fix double free --- source/libs/transport/src/transCli.c | 34 +++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c3e214b5e3..4887240f5d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1300,12 +1300,22 @@ static void cliBatchSendCb(uv_write_t* req, int status) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; + while (!QUEUE_IS_EMPTY(&wrapper->node)) { + queue* h = QUEUE_HEAD(&wrapper->node); + QUEUE_REMOVE(h); + + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); + transQueuePush(&conn->reqsSentOut, &pReq->q); + } + QUEUE_INIT(&wrapper->node); + freeWReqToWQ(&conn->wq, wrapper); int32_t ref = transUnrefCliHandle(conn); if (ref <= 0) { return; } + cliConnRmReqs(conn); if (status != 0) { tDebug("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); @@ -1398,6 +1408,10 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { int j = 0; int32_t batchLimit = 64; + + queue reqToSend; + QUEUE_INIT(&reqToSend); + while (!transQueueEmpty(&pConn->reqsToSend)) { queue* h = transQueuePop(&pConn->reqsToSend); SCliReq* pCliMsg = QUEUE_DATA(h, SCliReq, q); @@ -1453,24 +1467,42 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); - transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); + + QUEUE_PUSH(&reqToSend, &pCliMsg->q); if (j >= batchLimit) { break; } } transRefCliHandle(pConn); uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn); + if (req == NULL) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno)); + while (!QUEUE_IS_EMPTY(&reqToSend)) { + queue* h = QUEUE_HEAD(&reqToSend); + QUEUE_REMOVE(h); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); + transQueuePush(&pConn->reqsToSend, &pReq->q); + } transRefCliHandle(pConn); return terrno; } + SWReqsWrapper* pWreq = req->data; + QUEUE_MOVE(&reqToSend, &pWreq->node); tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen); int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb); if (ret != 0) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); + while (!QUEUE_IS_EMPTY(&pWreq->node)) { + queue* h = QUEUE_HEAD(&pWreq->node); + QUEUE_REMOVE(h); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); + transQueuePush(&pConn->reqsToSend, &pReq->q); + } + QUEUE_INIT(&pWreq->node); + freeWReqToWQ(&pConn->wq, req->data); code = TSDB_CODE_THIRDPARTY_ERROR; TAOS_UNUSED(transUnrefCliHandle(pConn));