remove the potential race condition on TCP FD

This commit is contained in:
Jeff Tao 2020-04-12 17:05:06 +08:00
parent 50d7e6876d
commit a5dabf7e99
1 changed files with 21 additions and 26 deletions

View File

@ -145,7 +145,7 @@ void *taosInitTcpServer(char *ip, uint16_t port, char *label, int numOfThreads,
void taosCleanUpTcpServer(void *handle) {
SThreadObj *pThreadObj;
SServerObj *pServerObj = (SServerObj *)handle;
SServerObj *pServerObj = handle;
if (pServerObj == NULL) return;
@ -174,15 +174,15 @@ void taosCleanUpTcpServer(void *handle) {
}
void taosCloseTcpServerConnection(void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
SFdObj *pFdObj = chandle;
if (pFdObj == NULL) return;
pFdObj->thandle = NULL;
taosCleanUpFdObj(pFdObj);
}
int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void *chandle) {
SFdObj *pFdObj = (SFdObj *)chandle;
SFdObj *pFdObj = chandle;
if (chandle == NULL) return -1;
@ -354,14 +354,25 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
if (pFdObj == NULL) return;
if (pFdObj->signature != pFdObj) return;
pFdObj->signature = NULL;
pThreadObj = pFdObj->pThreadObj;
if (pThreadObj == NULL) {
tError("FdObj double clean up!!!");
return;
// notify the upper layer, so it will clean the associated context
if (pFdObj->thandle) {
SRecvInfo recvInfo;
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
(*(pThreadObj->processData))(&recvInfo);
}
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
close(pFdObj->fd);
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
pthread_mutex_lock(&pThreadObj->threadMutex);
@ -384,24 +395,8 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
pthread_mutex_unlock(&pThreadObj->threadMutex);
// notify the upper layer, so it will clean the associated context
SRecvInfo recvInfo;
recvInfo.msg = NULL;
recvInfo.msgLen = 0;
recvInfo.ip = 0;
recvInfo.port = 0;
recvInfo.shandle = pThreadObj->shandle;
recvInfo.thandle = pFdObj->thandle;;
recvInfo.chandle = NULL;
recvInfo.connType = RPC_CONN_TCP;
if (pFdObj->thandle) (*(pThreadObj->processData))(&recvInfo);
tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId,
pFdObj, pThreadObj->numOfFds);
memset(pFdObj, 0, sizeof(SFdObj));
pFdObj, pThreadObj->numOfFds);
tfree(pFdObj);
}