commit
b0e3a1d0da
|
@ -300,6 +300,8 @@ int transSendResponse(const STransMsg* msg);
|
|||
int transRegisterMsg(const STransMsg* msg);
|
||||
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
|
||||
|
||||
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst);
|
||||
|
||||
int64_t transAllocHandle();
|
||||
|
||||
void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
|
||||
|
|
|
@ -37,9 +37,11 @@ typedef struct SCliConn {
|
|||
uint32_t port;
|
||||
|
||||
SDelayTask* task;
|
||||
|
||||
// debug and log info
|
||||
struct sockaddr_in addr;
|
||||
struct sockaddr_in localAddr;
|
||||
char src[32];
|
||||
char dst[32];
|
||||
|
||||
} SCliConn;
|
||||
|
||||
typedef struct SCliMsg {
|
||||
|
@ -95,6 +97,14 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
|
|||
static void addConnToPool(void* pool, SCliConn* conn);
|
||||
static void doCloseIdleConn(void* param);
|
||||
|
||||
static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
|
||||
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
|
||||
|
||||
char buf[20] = {0};
|
||||
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
|
||||
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
|
||||
return r;
|
||||
}
|
||||
// register timer in each thread to clear expire conn
|
||||
// static void cliTimeoutCb(uv_timer_t* handle);
|
||||
// alloc buf for recv
|
||||
|
@ -363,9 +373,9 @@ void cliHandleResp(SCliConn* conn) {
|
|||
}
|
||||
|
||||
STraceId* trace = &transMsg.info.traceId;
|
||||
tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, code:0x%x", CONN_GET_INST_LABEL(conn),
|
||||
conn, TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||
taosInetNtoa(conn->localAddr.sin_addr), ntohs(conn->localAddr.sin_port), transMsg.contLen, transMsg.code);
|
||||
|
||||
tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d, code:0x%x", CONN_GET_INST_LABEL(conn), conn,
|
||||
TMSG_INFO(pHead->msgType), conn->dst, conn->src, transMsg.contLen, transMsg.code);
|
||||
|
||||
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
|
||||
tDebug("%s except, conn %p read while cli ignore it", CONN_GET_INST_LABEL(conn), conn);
|
||||
|
@ -741,9 +751,8 @@ void cliSend(SCliConn* pConn) {
|
|||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
|
||||
STraceId* trace = &pMsg->info.traceId;
|
||||
tGTrace("%s conn %p %s is sent to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port));
|
||||
tGTrace("%s conn %p %s is sent to %s, local info %s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType),
|
||||
pConn->dst, pConn->src);
|
||||
|
||||
if (pHead->persist == 1) {
|
||||
CONN_SET_PERSIST_BY_APP(pConn);
|
||||
|
@ -764,11 +773,16 @@ void cliConnCb(uv_connect_t* req, int status) {
|
|||
cliHandleExcept(pConn);
|
||||
return;
|
||||
}
|
||||
int addrlen = sizeof(pConn->addr);
|
||||
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen);
|
||||
// int addrlen = sizeof(pConn->addr);
|
||||
struct sockaddr peername, sockname;
|
||||
int addrlen = sizeof(peername);
|
||||
|
||||
addrlen = sizeof(pConn->localAddr);
|
||||
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->localAddr, &addrlen);
|
||||
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
|
||||
transGetSockDebugInfo(&peername, pConn->dst);
|
||||
|
||||
addrlen = sizeof(sockname);
|
||||
uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen);
|
||||
transGetSockDebugInfo(&sockname, pConn->src);
|
||||
|
||||
tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
assert(pConn->stream == req->handle);
|
||||
|
|
|
@ -102,7 +102,14 @@ void transFreeMsg(void* msg) {
|
|||
}
|
||||
taosMemoryFree((char*)msg - sizeof(STransMsgHead));
|
||||
}
|
||||
int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) {
|
||||
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
|
||||
|
||||
char buf[20] = {0};
|
||||
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
|
||||
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
|
||||
return r;
|
||||
}
|
||||
int transInitBuffer(SConnBuffer* buf) {
|
||||
transClearBuffer(buf);
|
||||
return 0;
|
||||
|
|
|
@ -44,8 +44,12 @@ typedef struct SSvrConn {
|
|||
bool broken; // conn broken;
|
||||
|
||||
ConnStatus status;
|
||||
struct sockaddr_in addr;
|
||||
struct sockaddr_in localAddr;
|
||||
|
||||
uint32_t clientIp;
|
||||
uint16_t port;
|
||||
|
||||
char src[32];
|
||||
char dst[32];
|
||||
|
||||
int64_t refId;
|
||||
int spi;
|
||||
|
@ -248,15 +252,11 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||
transRefSrvHandle(pConn);
|
||||
|
||||
tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d", transLabel(pTransInst), pConn,
|
||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen);
|
||||
tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d", transLabel(pTransInst), pConn,
|
||||
TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen);
|
||||
} else {
|
||||
tGTrace("%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, resp:%d, code:%d",
|
||||
transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr),
|
||||
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port),
|
||||
transMsg.contLen, pHead->noResp, transMsg.code);
|
||||
// no ref here
|
||||
tGTrace("%s conn %p %s received from %s, local info:%s, msg size:%d, resp:%d, code:%d", transLabel(pTransInst),
|
||||
pConn, TMSG_INFO(transMsg.msgType), pConn->dst, pConn->src, transMsg.contLen, pHead->noResp, transMsg.code);
|
||||
}
|
||||
|
||||
// pHead->noResp = 1,
|
||||
|
@ -278,14 +278,13 @@ static void uvHandleReq(SSvrConn* pConn) {
|
|||
|
||||
// set up conn info
|
||||
SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
|
||||
pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr);
|
||||
pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
|
||||
pConnInfo->clientIp = pConn->clientIp;
|
||||
pConnInfo->clientPort = pConn->port;
|
||||
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
|
||||
|
||||
transReleaseExHandle(transGetRefMgt(), pConn->refId);
|
||||
|
||||
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
|
||||
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
|
||||
}
|
||||
|
||||
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
|
||||
|
@ -418,9 +417,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
|||
|
||||
STrans* pTransInst = pConn->pTransInst;
|
||||
STraceId* trace = &pMsg->info.traceId;
|
||||
tGTrace("%s conn %p %s is sent to %s:%d, local info:%s:%d, msglen:%d", transLabel(pTransInst), pConn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len);
|
||||
tGTrace("%s conn %p %s is sent to %s, local info:%s, msglen:%d", transLabel(pTransInst), pConn,
|
||||
TMSG_INFO(pHead->msgType), pConn->dst, pConn->src, len);
|
||||
pHead->msgLen = htonl(len);
|
||||
|
||||
wb->base = msg;
|
||||
|
@ -646,20 +644,26 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
|||
uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
|
||||
tTrace("conn %p created, fd:%d", pConn, fd);
|
||||
|
||||
int addrlen = sizeof(pConn->addr);
|
||||
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) {
|
||||
struct sockaddr peername, sockname;
|
||||
int addrlen = sizeof(peername);
|
||||
if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&peername, &addrlen)) {
|
||||
tError("conn %p failed to get peer info", pConn);
|
||||
transUnrefSrvHandle(pConn);
|
||||
return;
|
||||
}
|
||||
transGetSockDebugInfo(&peername, pConn->dst);
|
||||
|
||||
addrlen = sizeof(pConn->localAddr);
|
||||
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->localAddr, &addrlen)) {
|
||||
addrlen = sizeof(sockname);
|
||||
if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&sockname, &addrlen)) {
|
||||
tError("conn %p failed to get local info", pConn);
|
||||
transUnrefSrvHandle(pConn);
|
||||
return;
|
||||
}
|
||||
transGetSockDebugInfo(&sockname, pConn->src);
|
||||
struct sockaddr_in addr = *(struct sockaddr_in*)&sockname;
|
||||
|
||||
pConn->clientIp = addr.sin_addr.s_addr;
|
||||
pConn->port = ntohs(addr.sin_port);
|
||||
uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb);
|
||||
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue