From 24a2da04c1f1c495438e4fbd9ddaa9f477e5a936 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 13 Sep 2024 08:21:00 +0800 Subject: [PATCH] opt parameter --- source/libs/transport/src/transCli.c | 36 +++++++++++++++------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 04a90d7765..f1d1cd6284 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -238,6 +238,7 @@ void destroyCliConnQTable(SCliConn* conn) { // static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); +static void cliHandleBatch_shareConnExcept(SCliConn* conn); static int32_t allocConnRef(SCliConn* conn, bool update); static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp); @@ -921,7 +922,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (TRANS_CONN_REF_GET(conn) > 1) { transUnrefCliHandle(conn); } - cliDestroyConnMsgs(conn, false); + // cliDestroyConnMsgs(conn, false); if (conn->list == NULL && conn->dstAddr != NULL) { conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); @@ -1046,7 +1047,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { while (transReadComplete(pBuf)) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); if (pBuf->invalid) { - cliHandleExcept(conn, -1); + return cliHandleBatch_shareConnExcept(conn); break; } else { cliHandleResp2(conn); @@ -1238,7 +1239,7 @@ static void cliDestroy(uv_handle_t* handle) { tDebug("%s conn %p destroy state %ld", CONN_GET_INST_LABEL(conn), conn, *qid); } - cliDestroyConnMsgs(conn, true); + // cliDestroyConnMsgs(conn, true); destroyCliConnQTable(conn); if (conn->pInitUserReq) { @@ -1260,6 +1261,10 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { STrans* pInst = pThrd->pInst; queue set; + QUEUE_INIT(&set); + // TODO + // 1. from qId from thread table + // 2. not itera to all reqs transQueueRemoveByFilter(&conn->reqsSentOut, filterAllReq, NULL, &set, -1); transQueueRemoveByFilter(&conn->reqsToSend, filterAllReq, NULL, &set, -1); @@ -1275,8 +1280,13 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; resp.info.cliVer = pInst->compatibilityVer; resp.info.ahandle = pCtx ? pCtx->ahandle : 0; - // not - if (pCtx == NULL) { + if (pReq) { + resp.info.traceId = pReq->msg.info.traceId; + } + // resp.info.traceId = pReq ? pReq->msg.info.traceId : {0, 0}; + + // handle noresp and inter manage msg + if (pCtx == NULL || REQUEST_NO_RESP(&pReq->msg)) { destroyReq(pReq); continue; } @@ -1290,6 +1300,9 @@ static void cliHandleBatch_shareConnExcept(SCliConn* conn) { destroyReq(pReq); } } + if (!uv_is_closing((uv_handle_t*)conn->stream)) { + uv_close((uv_handle_t*)conn->stream, cliDestroy); + } } bool fileToRmReq(void* h, void* arg) { @@ -1612,16 +1625,7 @@ void cliConnCb(uv_connect_t* req, int status) { if (status != 0) { tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr, uv_strerror(status)); - // queue set; - // transQueueRemoveByFilter(&pConn->reqsToSend, filteGetAll, NULL, &set, -1); - // transQueueRemoveByFilter(&pConn->reqsSentOut, filteGetAll, NULL, &set, -1); - // while (!QUEUE_IS_EMPTY(&set)) { - // queue* el = QUEUE_HEAD(&set); - // SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); - // cliBuildExeceptMsg(pConn, pReq, &pReq->msg); - // destroyReq(pReq); - // } - cliHandleExcept(pConn, status); + cliHandleBatch_shareConnExcept(pConn); return; } pConn->connnected = 1; @@ -1881,7 +1885,7 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { SReqState* pState = taosHashGet(pThrd->pIdConnTable, &qid, sizeof(qid)); if (pState == NULL) { - if (pReq->ctx == NULL || pReq->ctx->ahandle == 0) { + if (pReq->ctx == NULL) { return TSDB_CODE_RPC_STATE_DROPED; } tDebug("failed to get statue, qid:%ld", qid);