commit
d1417b88d0
|
@ -489,17 +489,22 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg
|
||||||
// this API is used by server app to keep an APP context in case connection is broken
|
// this API is used by server app to keep an APP context in case connection is broken
|
||||||
int rpcReportProgress(void *handle, char *pCont, int contLen) {
|
int rpcReportProgress(void *handle, char *pCont, int contLen) {
|
||||||
SRpcConn *pConn = (SRpcConn *)handle;
|
SRpcConn *pConn = (SRpcConn *)handle;
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
if (pConn->user[0]) {
|
if (pConn->user[0]) {
|
||||||
// pReqMsg and reqMsgLen is re-used to store the context from app server
|
// pReqMsg and reqMsgLen is re-used to store the context from app server
|
||||||
pConn->pReqMsg = pCont;
|
pConn->pReqMsg = pCont;
|
||||||
pConn->reqMsgLen = contLen;
|
pConn->reqMsgLen = contLen;
|
||||||
return 0;
|
} else {
|
||||||
}
|
tTrace("%s, rpc connection is already released", pConn->info);
|
||||||
|
rpcFreeCont(pCont);
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
|
||||||
tTrace("%s, rpc connection is already released", pConn->info);
|
rpcUnlockConn(pConn);
|
||||||
rpcFreeCont(pCont);
|
return code;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* todo: cancel process may have race condition, pContext may have been released
|
/* todo: cancel process may have race condition, pContext may have been released
|
||||||
|
@ -555,18 +560,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcCloseConn(void *thandle) {
|
static void rpcReleaseConn(SRpcConn *pConn) {
|
||||||
SRpcConn *pConn = (SRpcConn *)thandle;
|
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
if (pConn->user[0] == 0) return;
|
if (pConn->user[0] == 0) return;
|
||||||
|
|
||||||
rpcLockConn(pConn);
|
|
||||||
|
|
||||||
if (pConn->user[0] == 0) {
|
|
||||||
rpcUnlockConn(pConn);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pConn->user[0] = 0;
|
pConn->user[0] = 0;
|
||||||
if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
|
if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
|
||||||
|
|
||||||
|
@ -591,7 +588,16 @@ static void rpcCloseConn(void *thandle) {
|
||||||
taosFreeId(pRpc->idPool, pConn->sid);
|
taosFreeId(pRpc->idPool, pConn->sid);
|
||||||
pConn->pContext = NULL;
|
pConn->pContext = NULL;
|
||||||
|
|
||||||
tTrace("%s, rpc connection is closed", pConn->info);
|
tTrace("%s, rpc connection is released", pConn->info);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rpcCloseConn(void *thandle) {
|
||||||
|
SRpcConn *pConn = (SRpcConn *)thandle;
|
||||||
|
|
||||||
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
|
if (pConn->user[0])
|
||||||
|
rpcReleaseConn(pConn);
|
||||||
|
|
||||||
rpcUnlockConn(pConn);
|
rpcUnlockConn(pConn);
|
||||||
}
|
}
|
||||||
|
@ -911,8 +917,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
||||||
|
|
||||||
if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
|
if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
|
||||||
|
|
||||||
|
rpcReleaseConn(pConn);
|
||||||
rpcUnlockConn(pConn);
|
rpcUnlockConn(pConn);
|
||||||
rpcCloseConn(pConn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
|
@ -1217,7 +1223,6 @@ static void rpcProcessConnError(void *param, void *id) {
|
||||||
static void rpcProcessRetryTimer(void *param, void *tmrId) {
|
static void rpcProcessRetryTimer(void *param, void *tmrId) {
|
||||||
SRpcConn *pConn = (SRpcConn *)param;
|
SRpcConn *pConn = (SRpcConn *)param;
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
int reportDisc = 0;
|
|
||||||
|
|
||||||
rpcLockConn(pConn);
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
|
@ -1233,31 +1238,33 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
|
||||||
} else {
|
} else {
|
||||||
// close the connection
|
// close the connection
|
||||||
tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
|
tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
|
||||||
reportDisc = 1;
|
if (pConn->pContext) {
|
||||||
|
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
|
taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl);
|
||||||
|
rpcReleaseConn(pConn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s, retry timer not processed", pConn->info);
|
tTrace("%s, retry timer not processed", pConn->info);
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcUnlockConn(pConn);
|
rpcUnlockConn(pConn);
|
||||||
|
|
||||||
if (reportDisc && pConn->pContext) {
|
|
||||||
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
|
||||||
rpcProcessConnError(pConn->pContext, NULL);
|
|
||||||
rpcCloseConn(pConn);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcProcessIdleTimer(void *param, void *tmrId) {
|
static void rpcProcessIdleTimer(void *param, void *tmrId) {
|
||||||
SRpcConn *pConn = (SRpcConn *)param;
|
SRpcConn *pConn = (SRpcConn *)param;
|
||||||
|
|
||||||
|
rpcLockConn(pConn);
|
||||||
|
|
||||||
if (pConn->user[0]) {
|
if (pConn->user[0]) {
|
||||||
tTrace("%s, close the connection since no activity", pConn->info);
|
tTrace("%s, close the connection since no activity", pConn->info);
|
||||||
if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
|
if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
|
||||||
rpcCloseConn(pConn);
|
rpcReleaseConn(pConn);
|
||||||
} else {
|
} else {
|
||||||
tTrace("%s, idle timer:%p not processed", pConn->info, tmrId);
|
tTrace("%s, idle timer:%p not processed", pConn->info, tmrId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rpcUnlockConn(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcProcessProgressTimer(void *param, void *tmrId) {
|
static void rpcProcessProgressTimer(void *param, void *tmrId) {
|
||||||
|
|
Loading…
Reference in New Issue