From 646b5e53a7aba1d1e861c614b67040ab7939d5a1 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Jun 2020 11:03:23 +0000 Subject: [PATCH 1/6] remove race condition in retry timer --- src/rpc/src/rpcMain.c | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 9bb63b751a..f812d95188 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -555,18 +555,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, return pConn; } -static void rpcCloseConn(void *thandle) { - SRpcConn *pConn = (SRpcConn *)thandle; +static void rpcReleaseConn(SRpcConn *pConn) { SRpcInfo *pRpc = pConn->pRpc; if (pConn->user[0] == 0) return; - rpcLockConn(pConn); - - if (pConn->user[0] == 0) { - rpcUnlockConn(pConn); - return; - } - pConn->user[0] = 0; if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle); @@ -591,7 +583,21 @@ static void rpcCloseConn(void *thandle) { taosFreeId(pRpc->idPool, pConn->sid); pConn->pContext = NULL; - tTrace("%s, rpc connection is closed", pConn->info); + tTrace("%s, rpc connection is released", pConn->info); +} + +static void rpcCloseConn(void *thandle) { + SRpcConn *pConn = (SRpcConn *)thandle; + if (pConn->user[0] == 0) return; + + rpcLockConn(pConn); + + if (pConn->user[0] == 0) { + rpcUnlockConn(pConn); + return; + } + + rpcReleaseConn(pConn); rpcUnlockConn(pConn); } @@ -911,8 +917,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->inType) rpcReportBrokenLinkToServer(pConn); + rpcReleaseConn(pConn); rpcUnlockConn(pConn); - rpcCloseConn(pConn); } static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { @@ -1217,7 +1223,6 @@ static void rpcProcessConnError(void *param, void *id) { static void rpcProcessRetryTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; SRpcInfo *pRpc = pConn->pRpc; - int reportDisc = 0; rpcLockConn(pConn); @@ -1233,19 +1238,17 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { } else { // close the connection tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); - reportDisc = 1; + if (pConn->pContext) { + pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + rpcProcessConnError(pConn->pContext, NULL); + rpcReleaseConn(pConn); + } } } else { tTrace("%s, retry timer not processed", pConn->info); } rpcUnlockConn(pConn); - - if (reportDisc && pConn->pContext) { - pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - rpcProcessConnError(pConn->pContext, NULL); - rpcCloseConn(pConn); - } } static void rpcProcessIdleTimer(void *param, void *tmrId) { From 4e87a28e8d0c980f2ea1c5d00cf530145d88a955 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Jun 2020 12:03:36 +0000 Subject: [PATCH 2/6] fix deadlock --- src/rpc/src/rpcMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f812d95188..d455cd645f 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1240,7 +1240,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); if (pConn->pContext) { pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - rpcProcessConnError(pConn->pContext, NULL); + taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); rpcReleaseConn(pConn); } } From 58922d86002a7d3772faba8229ef48e0951e693a Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Jun 2020 12:05:25 +0000 Subject: [PATCH 3/6] compiling error --- src/rpc/src/rpcMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index d455cd645f..2e2202a31b 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1240,7 +1240,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); if (pConn->pContext) { pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); + taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl); rpcReleaseConn(pConn); } } From 387f162ddf2f0e562858eafd37827e8cf8df602f Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Jun 2020 12:36:58 +0000 Subject: [PATCH 4/6] add lock in all timer processing --- src/rpc/src/rpcMain.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 2e2202a31b..90f11bb3de 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -490,6 +490,7 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg int rpcReportProgress(void *handle, char *pCont, int contLen) { SRpcConn *pConn = (SRpcConn *)handle; + rpcLockConn(pConn); if (pConn->user[0]) { // pReqMsg and reqMsgLen is re-used to store the context from app server pConn->pReqMsg = pCont; @@ -499,6 +500,8 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) { tTrace("%s, rpc connection is already released", pConn->info); rpcFreeCont(pCont); + rpcUnlockConn(pConn); + return -1; } @@ -1254,13 +1257,17 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; + rpcLockConn(pConn); + if (pConn->user[0]) { tTrace("%s, close the connection since no activity", pConn->info); if (pConn->inType) rpcReportBrokenLinkToServer(pConn); - rpcCloseConn(pConn); + rpcReleaseConn(pConn); } else { tTrace("%s, idle timer:%p not processed", pConn->info, tmrId); } + + rpcUnlockConn(pConn); } static void rpcProcessProgressTimer(void *param, void *tmrId) { From f3f0e39ab16f4bf6fb4675c8b62e28b6a8480a47 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Jun 2020 13:12:27 +0000 Subject: [PATCH 5/6] deadlock --- src/rpc/src/rpcMain.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 90f11bb3de..1558de37ea 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -489,20 +489,22 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg // this API is used by server app to keep an APP context in case connection is broken int rpcReportProgress(void *handle, char *pCont, int contLen) { SRpcConn *pConn = (SRpcConn *)handle; + int code = 0; rpcLockConn(pConn); + if (pConn->user[0]) { // pReqMsg and reqMsgLen is re-used to store the context from app server pConn->pReqMsg = pCont; pConn->reqMsgLen = contLen; - return 0; - } + } else { + tTrace("%s, rpc connection is already released", pConn->info); + rpcFreeCont(pCont); + code = -1; + } - tTrace("%s, rpc connection is already released", pConn->info); - rpcFreeCont(pCont); rpcUnlockConn(pConn); - - return -1; + return code; } /* todo: cancel process may have race condition, pContext may have been released From a703510f1bd0598634dfa04b5f54fa533e6c4e03 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 16 Jun 2020 13:40:18 +0000 Subject: [PATCH 6/6] tune up code --- src/rpc/src/rpcMain.c | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 1558de37ea..989021eb52 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -593,16 +593,11 @@ static void rpcReleaseConn(SRpcConn *pConn) { static void rpcCloseConn(void *thandle) { SRpcConn *pConn = (SRpcConn *)thandle; - if (pConn->user[0] == 0) return; rpcLockConn(pConn); - if (pConn->user[0] == 0) { - rpcUnlockConn(pConn); - return; - } - - rpcReleaseConn(pConn); + if (pConn->user[0]) + rpcReleaseConn(pConn); rpcUnlockConn(pConn); }