add rpcCancelRequest API

This commit is contained in:
Jeff Tao 2020-06-14 00:21:32 +00:00
parent a607ef21eb
commit 3ddef8f7e2
2 changed files with 20 additions and 3 deletions

View File

@ -78,12 +78,13 @@ void rpcClose(void *);
void *rpcMallocCont(int contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int contLen);
void rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg);
void *rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCanelRequest(void *pContext);
#ifdef __cplusplus
}

View File

@ -73,6 +73,7 @@ typedef struct {
SRpcInfo *pRpc; // associated SRpcInfo
SRpcIpSet ipSet; // ip list provided by app
void *ahandle; // handle provided by app
struct SRpcConn *pConn; // pConn allocated
char msgType; // message type
uint8_t *pCont; // content provided by app
int32_t contLen; // content length
@ -339,7 +340,7 @@ void *rpcReallocCont(void *ptr, int contLen) {
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
}
void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) {
void *rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) {
SRpcInfo *pRpc = (SRpcInfo *)shandle;
SRpcReqContext *pContext;
@ -367,7 +368,7 @@ void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg)
rpcSendReqToServer(pRpc, pContext);
return;
return pContext;
}
void rpcSendResponse(const SRpcMsg *pRsp) {
@ -501,6 +502,19 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
return -1;
}
/* todo: cancel process may have race condition, pContext may have been released
just before app calls the rpcCancelRequest */
void rpcCancelRequest(void *handle) {
SRpcReqContext *pContext = handle;
if (pContext->pConn) {
tTrace("%s, app trys to cancel request", pConn->info);
rpcCloseConn(pContext->pConn);
pContext->pConn = NULL;
rpcFreeCont(pContext->pCont);
}
}
static void rpcFreeMsg(void *msg) {
if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext);
@ -942,6 +956,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
SRpcInfo *pRpc = pContext->pRpc;
pContext->pConn = NULL;
if (pContext->pRsp) {
// for synchronous API
memcpy(pContext->pSet, &pContext->ipSet, sizeof(SRpcIpSet));
@ -1110,6 +1125,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
return;
}
pContext->pConn = pConn;
pConn->ahandle = pContext->ahandle;
rpcLockConn(pConn);