From cfedc4d67adee7938144cf2b9e8f16db41a0fca6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 Aug 2022 17:41:43 +0800 Subject: [PATCH 1/2] fix client deadlock --- source/libs/transport/src/transCli.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c433625637..44f074f92e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -891,7 +891,6 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { *ignore = true; - destroyCmsg(pMsg); return NULL; } else { conn = exh->handle; @@ -937,7 +936,16 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { bool ignore = false; SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); if (ignore == true) { - tError("ignore msg"); + // persist conn already release by server + STransMsg resp = {0}; + resp.code = TSDB_CODE_RPC_BROKEN_LINK; + resp.msgType = pMsg->msg.msgType + 1; + + resp.info.ahandle = pMsg && pMsg->ctx ? pMsg->ctx->ahandle : NULL; + resp.info.traceId = pMsg->msg.info.traceId; + + pTransInst->cfp(pTransInst->parent, &resp, NULL); + destroyCmsg(pMsg); return; } From 1c200b4198cf77ae1f29fea0e12a2e854db1b115 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 11 Aug 2022 17:46:37 +0800 Subject: [PATCH 2/2] fix client deadlock --- source/libs/transport/src/transCli.c | 43 ++++++++++++++-------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 44f074f92e..9eea43be23 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -211,27 +211,27 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - uint64_t ahandle = head->ahandle; \ - CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \ - SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \ - if (cliMsg->type == Release) return; \ - } \ - tDebug("%s conn %p receive release request, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); \ - if (T_REF_VAL_GET(conn) > 1) { \ - transUnrefCliHandle(conn); \ - } \ - destroyCmsg(pMsg); \ - cliReleaseUnfinishedMsg(conn); \ - transQueueClear(&conn->cliMsgs); \ - addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + uint64_t ahandle = head->ahandle; \ + CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \ + SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \ + if (cliMsg->type == Release) return; \ + } \ + tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \ + if (T_REF_VAL_GET(conn) > 1) { \ + transUnrefCliHandle(conn); \ + } \ + destroyCmsg(pMsg); \ + cliReleaseUnfinishedMsg(conn); \ + transQueueClear(&conn->cliMsgs); \ + addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ + return; \ + } \ } while (0) #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ @@ -890,6 +890,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { if (refId != 0) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { + tError("failed to get conn, refId: %" PRId64 "", refId); *ignore = true; return NULL; } else {