Merge pull request #2166 from taosdata/hotfix/rpcClose
add reference count for RpcInfo
This commit is contained in:
commit
34533dce22
|
@ -58,6 +58,7 @@ typedef struct {
|
||||||
void (*cfp)(SRpcMsg *, SRpcIpSet *);
|
void (*cfp)(SRpcMsg *, SRpcIpSet *);
|
||||||
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
|
int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey);
|
||||||
|
|
||||||
|
int refCount;
|
||||||
void *idPool; // handle to ID pool
|
void *idPool; // handle to ID pool
|
||||||
void *tmrCtrl; // handle to timer
|
void *tmrCtrl; // handle to timer
|
||||||
SHashObj *hash; // handle returned by hash utility
|
SHashObj *hash; // handle returned by hash utility
|
||||||
|
@ -199,6 +200,8 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen);
|
||||||
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
|
static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
|
||||||
static void rpcLockConn(SRpcConn *pConn);
|
static void rpcLockConn(SRpcConn *pConn);
|
||||||
static void rpcUnlockConn(SRpcConn *pConn);
|
static void rpcUnlockConn(SRpcConn *pConn);
|
||||||
|
static void rpcAddRef(SRpcInfo *pRpc);
|
||||||
|
static void rpcDecRef(SRpcInfo *pRpc);
|
||||||
|
|
||||||
void *rpcOpen(const SRpcInit *pInit) {
|
void *rpcOpen(const SRpcInit *pInit) {
|
||||||
SRpcInfo *pRpc;
|
SRpcInfo *pRpc;
|
||||||
|
@ -224,6 +227,7 @@ void *rpcOpen(const SRpcInit *pInit) {
|
||||||
pRpc->spi = pInit->spi;
|
pRpc->spi = pInit->spi;
|
||||||
pRpc->cfp = pInit->cfp;
|
pRpc->cfp = pInit->cfp;
|
||||||
pRpc->afp = pInit->afp;
|
pRpc->afp = pInit->afp;
|
||||||
|
pRpc->refCount = 1;
|
||||||
|
|
||||||
size_t size = sizeof(SRpcConn) * pRpc->sessions;
|
size_t size = sizeof(SRpcConn) * pRpc->sessions;
|
||||||
pRpc->connList = (SRpcConn *)calloc(1, size);
|
pRpc->connList = (SRpcConn *)calloc(1, size);
|
||||||
|
@ -293,15 +297,8 @@ void rpcClose(void *param) {
|
||||||
(*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
|
(*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
|
||||||
(*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
|
(*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
|
||||||
|
|
||||||
taosHashCleanup(pRpc->hash);
|
|
||||||
taosTmrCleanUp(pRpc->tmrCtrl);
|
|
||||||
taosIdPoolCleanUp(pRpc->idPool);
|
|
||||||
rpcCloseConnCache(pRpc->pCache);
|
|
||||||
|
|
||||||
tfree(pRpc->connList);
|
|
||||||
pthread_mutex_destroy(&pRpc->mutex);
|
|
||||||
tTrace("%s rpc is closed", pRpc->label);
|
tTrace("%s rpc is closed", pRpc->label);
|
||||||
tfree(pRpc);
|
rpcDecRef(pRpc);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *rpcMallocCont(int contLen) {
|
void *rpcMallocCont(int contLen) {
|
||||||
|
@ -378,6 +375,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
SRpcConn *pConn = (SRpcConn *)pRsp->handle;
|
SRpcConn *pConn = (SRpcConn *)pRsp->handle;
|
||||||
SRpcMsg rpcMsg = *pRsp;
|
SRpcMsg rpcMsg = *pRsp;
|
||||||
SRpcMsg *pMsg = &rpcMsg;
|
SRpcMsg *pMsg = &rpcMsg;
|
||||||
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
|
|
||||||
if ( pMsg->pCont == NULL ) {
|
if ( pMsg->pCont == NULL ) {
|
||||||
pMsg->pCont = rpcMallocCont(0);
|
pMsg->pCont = rpcMallocCont(0);
|
||||||
|
@ -395,6 +393,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
|
if ( pConn->inType == 0 || pConn->user[0] == 0 ) {
|
||||||
tTrace("%s, connection is already released, rsp wont be sent", pConn->info);
|
tTrace("%s, connection is already released, rsp wont be sent", pConn->info);
|
||||||
rpcUnlockConn(pConn);
|
rpcUnlockConn(pConn);
|
||||||
|
rpcDecRef(pRpc);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -420,7 +419,6 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
pConn->rspMsgLen = msgLen;
|
pConn->rspMsgLen = msgLen;
|
||||||
if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
|
if (pMsg->code == TSDB_CODE_RPC_ACTION_IN_PROGRESS) pConn->inTranId--;
|
||||||
|
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
|
||||||
taosTmrStopA(&pConn->pTimer);
|
taosTmrStopA(&pConn->pTimer);
|
||||||
|
|
||||||
// set the idle timer to monitor the activity
|
// set the idle timer to monitor the activity
|
||||||
|
@ -429,6 +427,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
pConn->secured = 1; // connection shall be secured
|
pConn->secured = 1; // connection shall be secured
|
||||||
|
|
||||||
rpcUnlockConn(pConn);
|
rpcUnlockConn(pConn);
|
||||||
|
rpcDecRef(pRpc); // decrease the referene count
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -951,6 +950,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
|
||||||
|
|
||||||
if ( rpcIsReq(pHead->msgType) ) {
|
if ( rpcIsReq(pHead->msgType) ) {
|
||||||
rpcMsg.handle = pConn;
|
rpcMsg.handle = pConn;
|
||||||
|
rpcAddRef(pRpc); // add the refCount for requests
|
||||||
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
|
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
|
||||||
(*(pRpc->cfp))(&rpcMsg, NULL);
|
(*(pRpc->cfp))(&rpcMsg, NULL);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1433,3 +1433,23 @@ static void rpcUnlockConn(SRpcConn *pConn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void rpcAddRef(SRpcInfo *pRpc)
|
||||||
|
{
|
||||||
|
atomic_add_fetch_8(&pRpc->refCount, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rpcDecRef(SRpcInfo *pRpc)
|
||||||
|
{
|
||||||
|
if (atomic_sub_fetch_8(&pRpc->refCount, 1) == 0) {
|
||||||
|
taosHashCleanup(pRpc->hash);
|
||||||
|
taosTmrCleanUp(pRpc->tmrCtrl);
|
||||||
|
taosIdPoolCleanUp(pRpc->idPool);
|
||||||
|
rpcCloseConnCache(pRpc->pCache);
|
||||||
|
|
||||||
|
tfree(pRpc->connList);
|
||||||
|
pthread_mutex_destroy(&pRpc->mutex);
|
||||||
|
tTrace("%s rpc resources are released", pRpc->label);
|
||||||
|
tfree(pRpc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue