remove potential race condition on FdObj
This commit is contained in:
parent
8c22eed1bb
commit
5e58bd5c5f
|
@ -65,18 +65,18 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
|
|||
pTcp->shandle = shandle;
|
||||
|
||||
if (pthread_mutex_init(&(pTcp->mutex), NULL) < 0) {
|
||||
tError("%s failed to init TCP mutex, reason:%s", label, strerror(errno));
|
||||
tError("%s failed to init TCP client mutex(%s)", label, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (pthread_cond_init(&(pTcp->fdReady), NULL) != 0) {
|
||||
tError("%s init TCP condition variable failed, reason:%s\n", label, strerror(errno));
|
||||
tError("%s init TCP condition variable failed(%s)", label, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pTcp->pollFd = epoll_create(10); // size does not matter
|
||||
if (pTcp->pollFd < 0) {
|
||||
tError("%s failed to create TCP epoll", label);
|
||||
tError("%s failed to create TCP client epoll", label);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -87,11 +87,11 @@ void *taosInitTcpClient(char *ip, uint16_t port, char *label, int num, void *fp,
|
|||
int code = pthread_create(&(pTcp->thread), &thattr, taosReadTcpData, (void *)(pTcp));
|
||||
pthread_attr_destroy(&thattr);
|
||||
if (code != 0) {
|
||||
tError("%s failed to create TCP read data thread, reason:%s", label, strerror(errno));
|
||||
tError("%s failed to create TCP read data thread(%s)", label, strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tTrace("%s TCP client is initialized, ip:%s port:%hu", label, ip, port);
|
||||
tTrace("%s TCP client is initialized, ip:%s:%hu", label, ip, port);
|
||||
|
||||
return pTcp;
|
||||
}
|
||||
|
@ -181,15 +181,9 @@ int taosSendTcpClientData(uint32_t ip, uint16_t port, void *data, int len, void
|
|||
return (int)send(pFdObj->fd, data, (size_t)len, 0);
|
||||
}
|
||||
|
||||
static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
|
||||
STcpClient *pTcp;
|
||||
static void taosReportBrokenLink(STcpFd *pFdObj) {
|
||||
SRecvInfo recvInfo;
|
||||
|
||||
if (pFdObj == NULL) return;
|
||||
if (pFdObj->signature != pFdObj) return;
|
||||
|
||||
pFdObj->signature = NULL;
|
||||
pTcp = pFdObj->pTcp;
|
||||
STcpClient *pTcp = pFdObj->pTcp;
|
||||
|
||||
if (pFdObj->thandle) {
|
||||
recvInfo.msg = NULL;
|
||||
|
@ -202,6 +196,15 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
|
|||
recvInfo.connType = RPC_CONN_TCP;
|
||||
(*(pTcp->processData))(&recvInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
|
||||
|
||||
if (pFdObj == NULL) return;
|
||||
if (pFdObj->signature != pFdObj) return;
|
||||
|
||||
pFdObj->signature = NULL;
|
||||
STcpClient *pTcp = pFdObj->pTcp;
|
||||
|
||||
epoll_ctl(pTcp->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
|
||||
close(pFdObj->fd);
|
||||
|
@ -211,7 +214,7 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
|
|||
pTcp->numOfFds--;
|
||||
|
||||
if (pTcp->numOfFds < 0)
|
||||
tError("%s number of TCP FDs shall never be negative, FD:%p", pTcp->label, pFdObj);
|
||||
tError("%s %p, number of FDs is negative!!!, FD:%p", pTcp->label, pFdObj->thandle, pFdObj);
|
||||
|
||||
if (pFdObj->prev) {
|
||||
(pFdObj->prev)->next = pFdObj->next;
|
||||
|
@ -225,7 +228,7 @@ static void taosCleanUpTcpFdObj(STcpFd *pFdObj) {
|
|||
|
||||
pthread_mutex_unlock(&pTcp->mutex);
|
||||
|
||||
tTrace("%s TCP is cleaned up, FD:%p numOfFds:%d", pTcp->label, pFdObj, pTcp->numOfFds);
|
||||
tTrace("%s %p, FD:%p is cleaned, numOfFds:%d", pTcp->label, pFdObj->thandle, pFdObj, pTcp->numOfFds);
|
||||
|
||||
tfree(pFdObj);
|
||||
}
|
||||
|
@ -250,29 +253,29 @@ static void *taosReadTcpData(void *param) {
|
|||
pFdObj = events[i].data.ptr;
|
||||
|
||||
if (events[i].events & EPOLLERR) {
|
||||
tTrace("%s TCP error happened on FD\n", pTcp->label);
|
||||
taosCleanUpTcpFdObj(pFdObj);
|
||||
tTrace("%s %p, TCP error happened on FD", pTcp->label, pFdObj->thandle);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (events[i].events & EPOLLHUP) {
|
||||
tTrace("%s TCP FD hang up\n", pTcp->label);
|
||||
taosCleanUpTcpFdObj(pFdObj);
|
||||
tTrace("%s %p, TCP FD hang up", pTcp->label, pFdObj->thandle);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
int headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
|
||||
if (headLen != sizeof(SRpcHead)) {
|
||||
tError("%s read error, headLen:%d", pTcp->label, headLen);
|
||||
taosCleanUpTcpFdObj(pFdObj);
|
||||
tError("%s %p, read error, headLen:%d", pTcp->label, pFdObj->thandle, headLen);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
|
||||
char *buffer = (char *)malloc((size_t)msgLen + tsRpcOverhead);
|
||||
if (NULL == buffer) {
|
||||
tTrace("%s TCP malloc(size:%d) fail\n", pTcp->label, msgLen);
|
||||
taosCleanUpTcpFdObj(pFdObj);
|
||||
tTrace("%s %p, TCP malloc(size:%d) fail", pTcp->label, pFdObj->thandle, msgLen);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -281,9 +284,10 @@ static void *taosReadTcpData(void *param) {
|
|||
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
|
||||
|
||||
if (leftLen != retLen) {
|
||||
tError("%s read error, leftLen:%d retLen:%d", pTcp->label, leftLen, retLen);
|
||||
tError("%s %p, read error, leftLen:%d retLen:%d",
|
||||
pTcp->label, pFdObj->thandle, leftLen, retLen);
|
||||
tfree(buffer);
|
||||
taosCleanUpTcpFdObj(pFdObj);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -817,7 +817,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
|
|||
SRpcInfo *pRpc = pConn->pRpc;
|
||||
|
||||
tTrace("%s %p, link is broken", pRpc->label, pConn);
|
||||
pConn->chandle = NULL;
|
||||
// pConn->chandle = NULL;
|
||||
|
||||
if (pConn->outType) {
|
||||
SRpcReqContext *pContext = pConn->pContext;
|
||||
|
|
|
@ -177,7 +177,6 @@ void taosCloseTcpServerConnection(void *chandle) {
|
|||
SFdObj *pFdObj = chandle;
|
||||
if (pFdObj == NULL) return;
|
||||
|
||||
pFdObj->thandle = NULL;
|
||||
taosCleanUpFdObj(pFdObj);
|
||||
}
|
||||
|
||||
|
@ -191,6 +190,25 @@ int taosSendTcpServerData(uint32_t ip, uint16_t port, void *data, int len, void
|
|||
|
||||
#define maxEvents 10
|
||||
|
||||
static void taosReportBrokenLink(SFdObj *pFdObj) {
|
||||
|
||||
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
||||
|
||||
// notify the upper layer, so it will clean the associated context
|
||||
if (pFdObj->thandle) {
|
||||
SRecvInfo recvInfo;
|
||||
recvInfo.msg = NULL;
|
||||
recvInfo.msgLen = 0;
|
||||
recvInfo.ip = 0;
|
||||
recvInfo.port = 0;
|
||||
recvInfo.shandle = pThreadObj->shandle;
|
||||
recvInfo.thandle = pFdObj->thandle;;
|
||||
recvInfo.chandle = NULL;
|
||||
recvInfo.connType = RPC_CONN_TCP;
|
||||
(*(pThreadObj->processData))(&recvInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void taosProcessTcpData(void *param) {
|
||||
SThreadObj * pThreadObj;
|
||||
int i, fdNum;
|
||||
|
@ -214,29 +232,29 @@ static void taosProcessTcpData(void *param) {
|
|||
pFdObj = events[i].data.ptr;
|
||||
|
||||
if (events[i].events & EPOLLERR) {
|
||||
tTrace("%s TCP thread:%d, error happened on FD", pThreadObj->label, pThreadObj->threadId);
|
||||
taosCleanUpFdObj(pFdObj);
|
||||
tTrace("%s %p, error happened on FD", pThreadObj->label, pFdObj->thandle);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (events[i].events & EPOLLHUP) {
|
||||
tTrace("%s TCP thread:%d, FD hang up", pThreadObj->label, pThreadObj->threadId);
|
||||
taosCleanUpFdObj(pFdObj);
|
||||
tTrace("%s %p, FD hang up", pThreadObj->label, pFdObj->thandle);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t headLen = taosReadMsg(pFdObj->fd, &rpcHead, sizeof(SRpcHead));
|
||||
if (headLen != sizeof(SRpcHead)) {
|
||||
tError("%s read error, headLen:%d, errno:%d", pThreadObj->label, headLen, errno);
|
||||
taosCleanUpFdObj(pFdObj);
|
||||
tError("%s %p, read error, headLen:%d", pThreadObj->label, pFdObj->thandle, headLen);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
|
||||
char *buffer = malloc(msgLen + tsRpcOverhead);
|
||||
if ( NULL == buffer) {
|
||||
tError("%s TCP malloc(size:%d) fail\n", pThreadObj->label, msgLen);
|
||||
taosCleanUpFdObj(pFdObj);
|
||||
tError("%s %p, TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -245,8 +263,9 @@ static void taosProcessTcpData(void *param) {
|
|||
int32_t retLen = taosReadMsg(pFdObj->fd, msg + headLen, leftLen);
|
||||
|
||||
if (leftLen != retLen) {
|
||||
tError("%s read error, leftLen:%d retLen:%d", pThreadObj->label, leftLen, retLen);
|
||||
taosCleanUpFdObj(pFdObj);
|
||||
tError("%s %p, read error, leftLen:%d retLen:%d",
|
||||
pThreadObj->label, pFdObj->thandle, leftLen, retLen);
|
||||
taosReportBrokenLink(pFdObj);
|
||||
tfree(buffer);
|
||||
continue;
|
||||
}
|
||||
|
@ -339,7 +358,7 @@ static void taosAcceptTcpConnection(void *arg) {
|
|||
pthread_cond_signal(&pThreadObj->fdReady);
|
||||
pthread_mutex_unlock(&(pThreadObj->threadMutex));
|
||||
|
||||
tTrace("%s TCP thread:%d, a new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
|
||||
tTrace("%s TCP thread:%d, new connection from %s:%hu, FD:%p, numOfFds:%d", pServerObj->label,
|
||||
pThreadObj->threadId, pFdObj->ipstr, pFdObj->port, pFdObj, pThreadObj->numOfFds);
|
||||
|
||||
// pick up next thread for next connection
|
||||
|
@ -349,27 +368,12 @@ static void taosAcceptTcpConnection(void *arg) {
|
|||
}
|
||||
|
||||
static void taosCleanUpFdObj(SFdObj *pFdObj) {
|
||||
SThreadObj *pThreadObj;
|
||||
|
||||
if (pFdObj == NULL) return;
|
||||
if (pFdObj->signature != pFdObj) return;
|
||||
|
||||
pFdObj->signature = NULL;
|
||||
pThreadObj = pFdObj->pThreadObj;
|
||||
|
||||
// notify the upper layer, so it will clean the associated context
|
||||
if (pFdObj->thandle) {
|
||||
SRecvInfo recvInfo;
|
||||
recvInfo.msg = NULL;
|
||||
recvInfo.msgLen = 0;
|
||||
recvInfo.ip = 0;
|
||||
recvInfo.port = 0;
|
||||
recvInfo.shandle = pThreadObj->shandle;
|
||||
recvInfo.thandle = pFdObj->thandle;;
|
||||
recvInfo.chandle = NULL;
|
||||
recvInfo.connType = RPC_CONN_TCP;
|
||||
(*(pThreadObj->processData))(&recvInfo);
|
||||
}
|
||||
SThreadObj *pThreadObj = pFdObj->pThreadObj;
|
||||
|
||||
close(pFdObj->fd);
|
||||
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
|
||||
|
@ -379,7 +383,8 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
|
|||
pThreadObj->numOfFds--;
|
||||
|
||||
if (pThreadObj->numOfFds < 0)
|
||||
tError("%s TCP thread:%d, number of FDs shall never be negative", pThreadObj->label, pThreadObj->threadId);
|
||||
tError("%s %p, TCP thread:%d, number of FDs is negative!!!",
|
||||
pThreadObj->label, pFdObj->thandle, pThreadObj->threadId);
|
||||
|
||||
// remove from the FdObject list
|
||||
|
||||
|
@ -395,8 +400,9 @@ static void taosCleanUpFdObj(SFdObj *pFdObj) {
|
|||
|
||||
pthread_mutex_unlock(&pThreadObj->threadMutex);
|
||||
|
||||
tTrace("%s TCP thread:%d, FD:%p is cleaned up, numOfFds:%d", pThreadObj->label, pThreadObj->threadId,
|
||||
pFdObj, pThreadObj->numOfFds);
|
||||
tTrace("%s %p, FD:%p is cleaned, numOfFds:%d",
|
||||
pThreadObj->label, pFdObj->thandle, pFdObj, pThreadObj->numOfFds);
|
||||
|
||||
tfree(pFdObj);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue