From 34f86d96861d25b2b255436fa934521443433225 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 27 Sep 2024 09:41:26 +0800 Subject: [PATCH] Merge remote-tracking branch 'origin/3.0' into enh/opt-transport --- include/libs/transport/trpc.h | 1 + source/libs/transport/src/transCli.c | 17 +++++++++----- source/libs/transport/src/transSvr.c | 33 ++++++++++------------------ 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 5d5dc00ad8..1fdf7db60a 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -132,6 +132,7 @@ typedef struct SRpcInit { int8_t shareConn; // 0: no share, 1. share int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait int8_t startReadTimer; + void *parent; } SRpcInit; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 35276d49fe..23048f4a7e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -263,7 +263,11 @@ static FORCE_INLINE void destroyReqAndAhanlde(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pInst); static FORCE_INLINE void destroyReqCtx(SReqCtx* ctx); -int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn); +static int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq); +static int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead); +static int32_t cliHandleState_mayCreateAhandle(SCliConn* conn, STransMsgHead* pHead, STransMsg* pResp); +static int32_t cliHandleState_mayUpdateStateCtx(SCliConn* pConn, SCliReq* pReq); + int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key); @@ -911,7 +915,7 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); - code = cliHandleState_mayUpdateState(pThrd, pReq, pConn); + code = cliHandleState_mayUpdateState(pConn, pReq); (void)addConnToHeapCache(pThrd->connHeapCache, pConn); (void)transQueuePush(&pConn->reqsToSend, &pReq->q); @@ -1648,9 +1652,10 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { } } -int32_t cliHandleState_mayUpdateState(SCliThrd* pThrd, SCliReq* pReq, SCliConn* pConn) { - int32_t code = 0; - int64_t qid = pReq->msg.info.qId; +int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq) { + SCliThrd* pThrd = pConn->hostThrd; + int32_t code = 0; + int64_t qid = pReq->msg.info.qId; if (qid == 0) { return TSDB_CODE_RPC_NO_STATE; } @@ -1703,7 +1708,7 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) { return; } } - code = cliHandleState_mayUpdateState(pThrd, pReq, pConn); + code = cliHandleState_mayUpdateState(pConn, pReq); } code = cliSendReq(pConn, pReq); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 864a65ebee..5be58cb5f4 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -437,8 +437,13 @@ static int8_t uvValidConn(SSvrConn* pConn) { static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { int32_t code = 0; STrans* pInst = pConn->pInst; - int64_t qId = taosHton64(pHead->qid); - if (pHead->msgType == TDMT_SCH_TASK_RELEASE && qId > 0) { + if (pHead->msgType == TDMT_SCH_TASK_RELEASE) { + int64_t qId = taosHton64(pHead->qid); + if (qId <= 0) { + tError("conn %p recv release, but invalid qid:%" PRId64 "", pConn, qId); + return TSDB_CODE_RPC_NO_STATE; + } + void* p = taosHashGet(pConn->pQTable, &qId, sizeof(qId)); if (p == NULL) { code = TSDB_CODE_RPC_NO_STATE; @@ -1644,22 +1649,8 @@ void uvHandleQuit(SSvrRespMsg* msg, SWorkThrd* thrd) { } taosMemoryFree(msg); } -void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd) { - return; - // int32_t code = 0; - // SSvrConn* conn = msg->pConn; - // if (conn->status == ConnAcquire) { - // if (!transQueuePush(&conn->resps, msg)) { - // return; - // } - // uvStartSendRespImpl(msg); - // return; - // } else if (conn->status == ConnRelease || conn->status == ConnNormal) { - // tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pInst), conn); - // } +void uvHandleRelease(SSvrRespMsg* msg, SWorkThrd* thrd) { return; } - // destroySmsg(msg); -} void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd) { // send msg to client tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pInst), msg->pConn); @@ -1669,16 +1660,16 @@ void uvHandleResp(SSvrRespMsg* msg, SWorkThrd* thrd) { int32_t uvHandleStateReq(SSvrRespMsg* msg) { int32_t code = 0; SSvrConn* conn = msg->pConn; - tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn, - msg->msg.info.qId); + int64_t qid = msg->msg.info.qId; + tDebug("%s conn %p start to register brokenlink callback, qid:%" PRId64 "", transLabel(conn->pInst), conn, qid); SSvrRegArg arg = {.notifyCount = 0, .init = 1, .msg = msg->msg}; - SSvrRegArg* p = taosHashGet(conn->pQTable, &msg->msg.info.qId, sizeof(msg->msg.info.qId)); + SSvrRegArg* p = taosHashGet(conn->pQTable, &qid, sizeof(qid)); if (p != NULL) { transFreeMsg(p->msg.pCont); } - code = taosHashPut(conn->pQTable, &msg->msg.info.qId, sizeof(msg->msg.info.qId), &arg, sizeof(arg)); + code = taosHashPut(conn->pQTable, &qid, sizeof(qid), &arg, sizeof(arg)); if (code == 0) tDebug("conn %p register brokenlink callback succ", conn); return code; }