Merge remote-tracking branch 'origin/develop' into feature/mpeer

This commit is contained in:
slguan 2020-04-15 20:20:21 +08:00
commit fa463c6530
2 changed files with 21 additions and 11 deletions

View File

@ -286,15 +286,15 @@ void *rpcOpen(const SRpcInit *pInit) {
void rpcClose(void *param) {
SRpcInfo *pRpc = (SRpcInfo *)param;
(*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
(*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
for (int i = 0; i < pRpc->sessions; ++i) {
if (pRpc->connList && pRpc->connList[i].user[0]) {
rpcCloseConn((void *)(pRpc->connList + i));
}
}
(*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle);
(*taosCleanUpConn[pRpc->connType])(pRpc->udphandle);
taosHashCleanup(pRpc->hash);
taosTmrCleanUp(pRpc->tmrCtrl);
taosIdPoolCleanUp(pRpc->idPool);
@ -521,11 +521,15 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerIpStr, uint16_t peerPort,
static void rpcCloseConn(void *thandle) {
SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = pConn->pRpc;
if (pConn->user[0] == 0) return;
rpcLockConn(pConn);
if (pConn->user[0] == 0) {
rpcUnlockConn(pConn);
return;
}
pConn->user[0] = 0;
if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);

View File

@ -158,8 +158,9 @@ void taosCleanUpTcpServer(void *handle) {
pThreadObj = pServerObj->pThreadObj + i;
while (pThreadObj->pHead) {
taosFreeFdObj(pThreadObj->pHead);
pThreadObj->pHead = pThreadObj->pHead;
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
close(pThreadObj->pollFd);
@ -269,8 +270,9 @@ void taosCleanUpTcpClient(void *chandle) {
if (pThreadObj == NULL) return;
while (pThreadObj->pHead) {
taosFreeFdObj(pThreadObj->pHead);
pThreadObj->pHead = pThreadObj->pHead->next;
SFdObj *pFdObj = pThreadObj->pHead;
pThreadObj->pHead = pFdObj->next;
taosFreeFdObj(pFdObj);
}
close(pThreadObj->pollFd);
@ -456,14 +458,18 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return;
pFdObj->signature = NULL;
SThreadObj *pThreadObj = pFdObj->pThreadObj;
pthread_mutex_lock(&pThreadObj->mutex);
if (pFdObj->signature == NULL) {
pthread_mutex_unlock(&pThreadObj->mutex);
return;
}
pFdObj->signature = NULL;
close(pFdObj->fd);
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
pthread_mutex_lock(&pThreadObj->mutex);
pThreadObj->numOfFds--;
if (pThreadObj->numOfFds < 0)