From baf580ee8b21846676360b5feeca9a53cba0e75e Mon Sep 17 00:00:00 2001 From: slguan Date: Fri, 2 Oct 2020 12:59:15 +0800 Subject: [PATCH 1/2] TD-1632 revert --- src/rpc/src/rpcMain.c | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index f0b8c996c5..6a5d3b079a 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -885,13 +885,14 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { int32_t sid; SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); + *ppContext = NULL; if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -945,6 +946,17 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); + if (terrno == 0) { + SRpcReqContext *pContext = pConn->pContext; + *ppContext = pContext; + pConn->pContext = NULL; + pConn->pReqMsg = NULL; + + // for UDP, port may be changed by server, the port in epSet shall be used for cache + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } + } } } @@ -1009,7 +1021,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } terrno = 0; - pConn = rpcProcessMsgHead(pRpc, pRecv); + SRpcReqContext *pContext; + pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, @@ -1029,7 +1042,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead); + rpcProcessIncomingMsg(pConn, pHead, pContext); } } @@ -1060,7 +1073,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { rpcFreeCont(pContext->pCont); } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1089,15 +1102,10 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { } } else { // it's a response - SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; - pConn->pContext = NULL; - pConn->pReqMsg = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } else { + if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { rpcCloseConn(pConn); } From 12687355c07b28c21e4ef628482a1a26858ac351 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 6 Oct 2020 11:46:33 +0000 Subject: [PATCH 2/2] TD-1632 --- src/connector/go | 2 +- src/rpc/src/rpcMain.c | 40 +++++++++++++++++++--------------------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/connector/go b/src/connector/go index 567b7b12f3..8c58c512b6 160000 --- a/src/connector/go +++ b/src/connector/go @@ -1 +1 @@ -Subproject commit 567b7b12f3fd2775c718d284beffc8c38dd6c219 +Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766 diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6a5d3b079a..b86b95b858 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -881,6 +881,20 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { pConn->outType = 0; pConn->pReqMsg = NULL; pConn->reqMsgLen = 0; + SRpcReqContext *pContext = pConn->pContext; + + if (pHead->code == TSDB_CODE_RPC_REDIRECT) { + if (rpcContLenFromMsg(pHead->msgLen) < sizeof(SRpcEpSet)) { + // if EpSet is not included in the msg, treat it as NOT_READY + pHead->code = TSDB_CODE_RPC_NOT_READY; + } else { + pContext->redirect++; + if (pContext->redirect > TSDB_MAX_REPLICA) { + pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + tWarn("%s, too many redirects, quit", pConn->info); + } + } + } return TSDB_CODE_SUCCESS; } @@ -950,12 +964,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont SRpcReqContext *pContext = pConn->pContext; *ppContext = pContext; pConn->pContext = NULL; - pConn->pReqMsg = NULL; - - // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } } } } @@ -1083,9 +1091,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; rpcMsg.code = pHead->code; - rpcMsg.ahandle = pConn->ahandle; if ( rpcIsReq(pHead->msgType) ) { + rpcMsg.ahandle = pConn->ahandle; if (rpcMsg.contLen > 0) { rpcMsg.handle = pConn; rpcAddRef(pRpc); // add the refCount for requests @@ -1103,25 +1111,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } else { // it's a response rpcMsg.handle = pContext; + rpcMsg.ahandle = pContext->ahandle; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } else { rpcCloseConn(pConn); } - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { - if (rpcMsg.contLen < sizeof(SRpcEpSet)) { - // if EpSet is not included in the msg, treat it as NOT_READY - pHead->code = TSDB_CODE_RPC_NOT_READY; - } else { - pContext->redirect++; - if (pContext->redirect > TSDB_MAX_REPLICA) { - pHead->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - tWarn("%s, too many redirects, quit", pConn->info); - } - } - } - if (pHead->code == TSDB_CODE_RPC_REDIRECT) { pContext->numOfTry = 0; SRpcEpSet *pEpSet = (SRpcEpSet*)pHead->content;