From 7d81d4f18e6b8f1125a21bf971e2846f15f185cf Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 29 Aug 2024 03:24:02 +0000 Subject: [PATCH] refactor transport --- source/libs/transport/src/transCli.c | 75 ++++++++++++++++++---------- 1 file changed, 50 insertions(+), 25 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 448dd7796f..6afb1fac00 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -535,6 +535,14 @@ int32_t cliGetReqBySeq(SCliConn* conn, int32_t seq, SCliReq** pReq) { } return TSDB_CODE_OUT_OF_RANGE; } + +int32_t cliMayRecycleConn(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + if (transQueueSize(&conn->reqs) == 0) { + addConnToPool(pThrd->pool, conn); + } + return 0; +} void cliHandleResp2(SCliConn* conn) { int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; @@ -560,11 +568,18 @@ void cliHandleResp2(SCliConn* conn) { if ((code = transDecompressMsg((char**)&pHead, msgLen)) < 0) { tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); // TODO: notify cb + return; } SCliReq* pReq = NULL; int32_t seq = pHead->seqNum; code = cliGetReqBySeq(conn, seq, &pReq); + if (code != 0) { + tDebug("%s conn %p recv unexpected packet, reason:%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); + // TODO: notify cb + return; + } + pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); // TODO handle release req @@ -572,20 +587,28 @@ void cliHandleResp2(SCliConn* conn) { // return; // } - STransMsg transMsg = {0}; - transMsg.contLen = transContLenFromMsg(pHead->msgLen); - transMsg.pCont = transContFromHead((char*)pHead); - transMsg.code = pHead->code; - transMsg.msgType = pHead->msgType; - transMsg.info.ahandle = NULL; - transMsg.info.traceId = pHead->traceId; - transMsg.info.hasEpSet = pHead->hasEpSet; - transMsg.info.cliVer = htonl(pHead->compatibilityVer); + STransMsg resp = {0}; + resp.contLen = transContLenFromMsg(pHead->msgLen); + resp.pCont = transContFromHead((char*)pHead); + resp.code = pHead->code; + resp.msgType = pHead->msgType; + resp.info.ahandle = NULL; + resp.info.traceId = pHead->traceId; + resp.info.hasEpSet = pHead->hasEpSet; + resp.info.cliVer = htonl(pHead->compatibilityVer); + resp.info.ahandle = pReq->ctx ? pReq->ctx->ahandle : NULL; if (code != 0) { tDebug("%s conn %p recv invalid packet, seq %d not found", CONN_GET_INST_LABEL(conn), conn, seq); } - + code = cliNotifyCb(conn, pReq, &resp); + if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { + STraceId* trace = &resp.info.traceId; + tGWarn("%s msg need retry", CONN_GET_INST_LABEL(conn)); + // retry, notify + } else { + } + (void)cliMayRecycleConn(conn); return; } void cliHandleResp(SCliConn* conn) { @@ -594,9 +617,9 @@ void cliHandleResp(SCliConn* conn) { STrans* pInst = pThrd->pInst; cliResetConnTimer(conn); - if (pInst->shareConn) { - return cliHandleResp_shareConn(conn); - } + // if (pInst->shareConn) { + // return cliHandleResp_shareConn(conn); + // } STransMsgHead* pHead = NULL; @@ -2715,6 +2738,9 @@ int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { int32_t notfiyCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { // impl later + SCliThrd* pThrd = thrd; + STrans* pInst = pThrd->pInst; + return 0; } @@ -3176,7 +3202,7 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, false); transFreeMsg(pResp->pCont); - transUnrefCliHandle(pConn); + // transUnrefCliHandle(pConn); } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_VND_STOPPED || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || @@ -3184,16 +3210,16 @@ bool cliMayRetry(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, true); transFreeMsg(pResp->pCont); - addConnToPool(pThrd->pool, pConn); + // addConnToPool(pThrd->pool, pConn); } else if (code == TSDB_CODE_SYN_RESTORING) { tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, true); - addConnToPool(pThrd->pool, pConn); + // addConnToPool(pThrd->pool, pConn); transFreeMsg(pResp->pCont); } else { tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, false); - addConnToPool(pThrd->pool, pConn); + // addConnToPool(pThrd->pool, pConn); transFreeMsg(pResp->pCont); } if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) { @@ -3283,15 +3309,14 @@ int32_t cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; - if (pReq == NULL || pReq->ctx == NULL) { - tTrace("%s conn %p handle resp", pInst->label, pConn); - pInst->cfp(pInst->parent, pResp, NULL); - return 0; - } + // if (pReq == NULL || pReq->ctx == NULL) { + // tTrace("%s conn %p handle resp", pInst->label, pConn); + // pInst->cfp(pInst->parent, pResp, NULL); + // return 0; + // } - bool retry = cliMayRetry(pConn, pReq, pResp); - if (retry == true) { - return -1; + if (cliMayRetry(pConn, pReq, pResp)) { + return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } cliMayResetRespCode(pReq, pResp);