commit
aa9b3e3188
|
@ -867,9 +867,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
// underlying UDP layer does not know it is server or client
|
// underlying UDP layer does not know it is server or client
|
||||||
pRecv->connType = pRecv->connType | pRpc->connType;
|
pRecv->connType = pRecv->connType | pRpc->connType;
|
||||||
|
|
||||||
if (pRecv->ip==0 && pConn) {
|
if (pRecv->ip == 0 && pConn) {
|
||||||
rpcProcessBrokenLink(pConn);
|
rpcProcessBrokenLink(pConn);
|
||||||
rpcFreeMsg(pRecv->msg);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -215,7 +215,6 @@ static void* taosAcceptTcpConnection(void *arg) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(caddr.sin_addr), caddr.sin_port);
|
|
||||||
taosKeepTcpAlive(connFd);
|
taosKeepTcpAlive(connFd);
|
||||||
|
|
||||||
// pick up the thread to handle this connection
|
// pick up the thread to handle this connection
|
||||||
|
@ -229,7 +228,8 @@ static void* taosAcceptTcpConnection(void *arg) {
|
||||||
inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
|
inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
|
||||||
} else {
|
} else {
|
||||||
close(connFd);
|
close(connFd);
|
||||||
tError("%s failed to malloc FdObj(%s)", pServerObj->label, strerror(errno));
|
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
|
||||||
|
inet_ntoa(caddr.sin_addr), caddr.sin_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
// pick up next thread for next connection
|
// pick up next thread for next connection
|
||||||
|
@ -341,6 +341,8 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
|
||||||
recvInfo.chandle = NULL;
|
recvInfo.chandle = NULL;
|
||||||
recvInfo.connType = RPC_CONN_TCP;
|
recvInfo.connType = RPC_CONN_TCP;
|
||||||
(*(pThreadObj->processData))(&recvInfo);
|
(*(pThreadObj->processData))(&recvInfo);
|
||||||
|
} else {
|
||||||
|
taosFreeFdObj(pFdObj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -466,7 +468,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
|
||||||
|
|
||||||
pFdObj->signature = NULL;
|
pFdObj->signature = NULL;
|
||||||
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
|
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
|
||||||
close(pFdObj->fd);
|
taosCloseTcpSocket(pFdObj->fd);
|
||||||
|
|
||||||
pThreadObj->numOfFds--;
|
pThreadObj->numOfFds--;
|
||||||
|
|
||||||
|
|
|
@ -127,6 +127,8 @@ int main(int argc, char *argv[]) {
|
||||||
SRpcInit rpcInit;
|
SRpcInit rpcInit;
|
||||||
char dataName[20] = "server.data";
|
char dataName[20] = "server.data";
|
||||||
|
|
||||||
|
taosBlockSIGPIPE();
|
||||||
|
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
rpcInit.localPort = 7000;
|
rpcInit.localPort = 7000;
|
||||||
rpcInit.label = "SER";
|
rpcInit.label = "SER";
|
||||||
|
|
Loading…
Reference in New Issue