diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f40ff3eee2..36fc9a7846 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -127,6 +127,7 @@ typedef struct { typedef struct SCliReq { SReqCtx* ctx; queue q; + queue sendQ; STransMsgType type; uint64_t st; int64_t seq; @@ -1309,8 +1310,8 @@ static void cliBatchSendCb(uv_write_t* req, int status) { queue* h = QUEUE_HEAD(&wrapper->node); QUEUE_REMOVE(h); - SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - transQueuePush(&conn->reqsSentOut, &pReq->q); + SCliReq* pReq = QUEUE_DATA(h, SCliReq, sendQ); + pReq->sent = 1; } freeWReqToWQ(&conn->wq, wrapper); @@ -1471,14 +1472,16 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { wb[j++] = uv_buf_init((char*)pHead, msgLen); totalLen += msgLen; - pCliMsg->sent = 1; pCliMsg->seq = pConn->seq; STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", sid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); - QUEUE_PUSH(&reqToSend, &pCliMsg->q); + // QUEUE_PUSH(&reqToSend, &pCliMsg->q); + transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); + QUEUE_INIT(&pCliMsg->sendQ); + QUEUE_PUSH(&reqToSend, &pCliMsg->sendQ); if (j >= batchLimit) { break; } @@ -1488,29 +1491,18 @@ int32_t cliBatchSend(SCliConn* pConn, int8_t direct) { if (req == NULL) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(terrno)); - while (!QUEUE_IS_EMPTY(&reqToSend)) { - queue* h = QUEUE_HEAD(&reqToSend); - QUEUE_REMOVE(h); - SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - transQueuePush(&pConn->reqsToSend, &pReq->q); - } transRefCliHandle(pConn); return terrno; } SWReqsWrapper* pWreq = req->data; + QUEUE_MOVE(&reqToSend, &pWreq->node); tDebug("%s conn %p start to send msg, batch size:%d, len:%d", CONN_GET_INST_LABEL(pConn), pConn, j, totalLen); int32_t ret = uv_write(req, (uv_stream_t*)pConn->stream, wb, j, cliBatchSendCb); if (ret != 0) { tError("%s conn %p failed to send msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); - while (!QUEUE_IS_EMPTY(&pWreq->node)) { - queue* h = QUEUE_HEAD(&pWreq->node); - QUEUE_REMOVE(h); - SCliReq* pReq = QUEUE_DATA(h, SCliReq, q); - transQueuePush(&pConn->reqsToSend, &pReq->q); - } freeWReqToWQ(&pConn->wq, req->data); code = TSDB_CODE_THIRDPARTY_ERROR; TAOS_UNUSED(transUnrefCliHandle(pConn));