Merge pull request #5563 from taosdata/origin/hotfix/TD-3409

TD-3409
This commit is contained in:
haojun Liao 2021-03-25 16:52:57 +08:00 committed by GitHub
commit f36211888b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 20 additions and 9 deletions

View File

@ -1017,6 +1017,13 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
return pConn; return pConn;
} }
static void doRpcReportBrokenLinkToServer(void *param, void *id) {
SRpcMsg *pRpcMsg = (SRpcMsg *)(param);
SRpcConn *pConn = (SRpcConn *)(pRpcMsg->handle);
SRpcInfo *pRpc = pConn->pRpc;
(*(pRpc->cfp))(pRpcMsg, NULL);
free(pRpcMsg);
}
static void rpcReportBrokenLinkToServer(SRpcConn *pConn) { static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if (pConn->pReqMsg == NULL) return; if (pConn->pReqMsg == NULL) return;
@ -1025,16 +1032,20 @@ static void rpcReportBrokenLinkToServer(SRpcConn *pConn) {
rpcAddRef(pRpc); rpcAddRef(pRpc);
tDebug("%s, notify the server app, connection is gone", pConn->info); tDebug("%s, notify the server app, connection is gone", pConn->info);
SRpcMsg rpcMsg; SRpcMsg *rpcMsg = malloc(sizeof(SRpcMsg));
rpcMsg.pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server rpcMsg->pCont = pConn->pReqMsg; // pReqMsg is re-used to store the APP context from server
rpcMsg.contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length rpcMsg->contLen = pConn->reqMsgLen; // reqMsgLen is re-used to store the APP context length
rpcMsg.ahandle = pConn->ahandle; rpcMsg->ahandle = pConn->ahandle;
rpcMsg.handle = pConn; rpcMsg->handle = pConn;
rpcMsg.msgType = pConn->inType; rpcMsg->msgType = pConn->inType;
rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
pConn->reqMsgLen = 0; pConn->reqMsgLen = 0;
if (pRpc->cfp) (*(pRpc->cfp))(&rpcMsg, NULL); if (pRpc->cfp) {
taosTmrStart(doRpcReportBrokenLinkToServer, 0, rpcMsg, pRpc->tmrCtrl);
} else {
free(rpcMsg);
}
} }
static void rpcProcessBrokenLink(SRpcConn *pConn) { static void rpcProcessBrokenLink(SRpcConn *pConn) {
@ -1051,7 +1062,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
} }
if (pConn->inType) rpcReportBrokenLinkToServer(pConn); if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
rpcReleaseConn(pConn); rpcReleaseConn(pConn);