[transport]<fix> cannot transport by hostname
This commit is contained in:
parent
4a03b48fba
commit
2e6b9c7b99
|
@ -130,9 +130,9 @@ static SCliThrdObj* createThrdObj();
|
|||
static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||
|
||||
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
|
||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)(conn)->hostThrd)->pTransInst))->label)
|
||||
#define CONN_SHOULD_RELEASE(conn, head) \
|
||||
do { \
|
||||
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
|
||||
|
@ -154,20 +154,20 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
|
||||
do { \
|
||||
int i = 0, sz = transQueueSize(&conn->cliMsgs); \
|
||||
for (; i < sz; i++) { \
|
||||
pMsg = transQueueGet(&conn->cliMsgs, i); \
|
||||
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
|
||||
do { \
|
||||
int i = 0, sz = transQueueSize(&conn->cliMsgs); \
|
||||
for (; i < sz; i++) { \
|
||||
pMsg = transQueueGet(&conn->cliMsgs, i); \
|
||||
if (pMsg != NULL && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
if (i == sz) { \
|
||||
pMsg = NULL; \
|
||||
} else { \
|
||||
pMsg = transQueueRm(&conn->cliMsgs, i); \
|
||||
} \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
if (i == sz) { \
|
||||
pMsg = NULL; \
|
||||
} else { \
|
||||
pMsg = transQueueRm(&conn->cliMsgs, i); \
|
||||
} \
|
||||
} while (0)
|
||||
#define CONN_GET_NEXT_SENDMSG(conn) \
|
||||
do { \
|
||||
|
@ -205,12 +205,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
|||
transRefCliHandle(conn); \
|
||||
} \
|
||||
} while (0)
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) \
|
||||
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define CONN_RELEASE_BY_SERVER(conn) \
|
||||
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
|
||||
|
||||
static void* cliWorkThread(void* arg);
|
||||
|
@ -281,9 +279,8 @@ void cliHandleResp(SCliConn* conn) {
|
|||
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||
|
||||
conn->secured = pHead->secured;
|
||||
|
||||
|
@ -349,12 +346,10 @@ void cliHandleExcept(SCliConn* pConn) {
|
|||
|
||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle,
|
||||
TMSG_INFO(transMsg.msgType));
|
||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType));
|
||||
if (transMsg.ahandle == NULL) {
|
||||
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
transMsg.ahandle);
|
||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle);
|
||||
}
|
||||
} else {
|
||||
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||
|
@ -628,9 +623,8 @@ void cliSend(SCliConn* pConn) {
|
|||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
|
||||
if (pHead->persist == 1) {
|
||||
CONN_SET_PERSIST_BY_APP(pConn);
|
||||
|
@ -740,8 +734,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
|||
if (ret) {
|
||||
tError("%s cli conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret));
|
||||
}
|
||||
|
||||
struct sockaddr_in addr;
|
||||
uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = taosGetIpv4FromFqdn(conn->ip);
|
||||
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
||||
// uv_ip4_addr(pMsg->ctx->ip, pMsg->ctx->port, &addr);
|
||||
// handle error in callback if fail to connect
|
||||
tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port);
|
||||
uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||
|
|
Loading…
Reference in New Issue