Merge pull request #27303 from taosdata/enh/addTraceLogToHttp
Enh/addTraceLogToHttp
This commit is contained in:
commit
27bdc9e71e
|
@ -240,6 +240,7 @@ typedef struct {
|
||||||
void* pThrd;
|
void* pThrd;
|
||||||
queue qmsg;
|
queue qmsg;
|
||||||
TdThreadMutex mtx; // protect qmsg;
|
TdThreadMutex mtx; // protect qmsg;
|
||||||
|
int64_t num;
|
||||||
} SAsyncItem;
|
} SAsyncItem;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -30,6 +30,8 @@ static int32_t FAST_FAILURE_LIMIT = 1;
|
||||||
|
|
||||||
static int64_t httpDefaultChanId = -1;
|
static int64_t httpDefaultChanId = -1;
|
||||||
|
|
||||||
|
static int64_t httpSeqNum = 0;
|
||||||
|
|
||||||
typedef struct SHttpModule {
|
typedef struct SHttpModule {
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
|
@ -50,6 +52,7 @@ typedef struct SHttpMsg {
|
||||||
EHttpCompFlag flag;
|
EHttpCompFlag flag;
|
||||||
int8_t quit;
|
int8_t quit;
|
||||||
int64_t chanId;
|
int64_t chanId;
|
||||||
|
int64_t seq;
|
||||||
} SHttpMsg;
|
} SHttpMsg;
|
||||||
|
|
||||||
typedef struct SHttpClient {
|
typedef struct SHttpClient {
|
||||||
|
@ -62,6 +65,7 @@ typedef struct SHttpClient {
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
struct sockaddr_in dest;
|
struct sockaddr_in dest;
|
||||||
int64_t chanId;
|
int64_t chanId;
|
||||||
|
int64_t seq;
|
||||||
} SHttpClient;
|
} SHttpClient;
|
||||||
|
|
||||||
typedef struct SHttpConnList {
|
typedef struct SHttpConnList {
|
||||||
|
@ -193,14 +197,14 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port,
|
||||||
uint32_t ip = 0;
|
uint32_t ip = 0;
|
||||||
int32_t code = taosGetIpv4FromFqdn(server, &ip);
|
int32_t code = taosGetIpv4FromFqdn(server, &ip);
|
||||||
if (code) {
|
if (code) {
|
||||||
tError("http-report failed to resolving domain names: %s", server);
|
tError("http-report failed to resolving domain names %s, reason: %s", server, tstrerror(code));
|
||||||
return TSDB_CODE_RPC_FQDN_ERROR;
|
return TSDB_CODE_RPC_FQDN_ERROR;
|
||||||
}
|
}
|
||||||
char buf[256] = {0};
|
char buf[256] = {0};
|
||||||
tinet_ntoa(buf, ip);
|
tinet_ntoa(buf, ip);
|
||||||
int ret = uv_ip4_addr(buf, port, dest);
|
int ret = uv_ip4_addr(buf, port, dest);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("http-report failed to get addr %s", uv_err_name(ret));
|
tError("http-report failed to get addr, reason:%s", uv_err_name(ret));
|
||||||
return TSDB_CODE_THIRDPARTY_ERROR;
|
return TSDB_CODE_THIRDPARTY_ERROR;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -215,14 +219,15 @@ static void* httpThread(void* arg) {
|
||||||
|
|
||||||
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
|
||||||
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) {
|
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) {
|
||||||
|
int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1);
|
||||||
if (server == NULL || uri == NULL) {
|
if (server == NULL || uri == NULL) {
|
||||||
tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId);
|
tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum);
|
||||||
*httpMsg = NULL;
|
*httpMsg = NULL;
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCont == NULL || contLen == 0) {
|
if (pCont == NULL || contLen == 0) {
|
||||||
tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId);
|
tError("http-report failed to report empty packet, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum);
|
||||||
*httpMsg = NULL;
|
*httpMsg = NULL;
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
@ -233,6 +238,7 @@ static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port,
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msg->seq = seqNum;
|
||||||
msg->port = port;
|
msg->port = port;
|
||||||
msg->server = taosStrdup(server);
|
msg->server = taosStrdup(server);
|
||||||
msg->uri = taosStrdup(uri);
|
msg->uri = taosStrdup(uri);
|
||||||
|
@ -259,7 +265,11 @@ static void httpDestroyMsg(SHttpMsg* msg) {
|
||||||
taosMemoryFree(msg->cont);
|
taosMemoryFree(msg->cont);
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
}
|
}
|
||||||
static void httpDestroyMsgWrapper(void* cont, void* param) { httpDestroyMsg((SHttpMsg*)cont); }
|
static void httpDestroyMsgWrapper(void* cont, void* param) {
|
||||||
|
SHttpMsg* pMsg = cont;
|
||||||
|
tWarn("http-report destroy msg, chanId:%" PRId64 ", seq:%" PRId64 "", pMsg->chanId, pMsg->seq);
|
||||||
|
httpDestroyMsg(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
||||||
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
||||||
|
@ -272,6 +282,8 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
msg = QUEUE_DATA(h, SHttpMsg, q);
|
msg = QUEUE_DATA(h, SHttpMsg, q);
|
||||||
if (!msg->quit) {
|
if (!msg->quit) {
|
||||||
|
tError("http-report failed to report chanId:%" PRId64 ",seq:%" PRId64 ", reason: %s", msg->chanId, msg->seq,
|
||||||
|
tstrerror(TSDB_CODE_HTTP_MODULE_QUIT));
|
||||||
httpDestroyMsg(msg);
|
httpDestroyMsg(msg);
|
||||||
} else {
|
} else {
|
||||||
quitMsg = msg;
|
quitMsg = msg;
|
||||||
|
@ -281,6 +293,25 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
||||||
QUEUE_PUSH(&item->qmsg, &quitMsg->q);
|
QUEUE_PUSH(&item->qmsg, &quitMsg->q);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static void httpTrace(queue* q) {
|
||||||
|
if (!(rpcDebugFlag & DEBUG_DEBUG) || (QUEUE_IS_EMPTY(q))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t startSeq = 0, endSeq = 0;
|
||||||
|
SHttpMsg* msg = NULL;
|
||||||
|
|
||||||
|
queue* h = QUEUE_HEAD(q);
|
||||||
|
msg = QUEUE_DATA(h, SHttpMsg, q);
|
||||||
|
startSeq = msg->seq;
|
||||||
|
|
||||||
|
h = QUEUE_TAIL(q);
|
||||||
|
msg = QUEUE_DATA(h, SHttpMsg, q);
|
||||||
|
endSeq = msg->seq;
|
||||||
|
|
||||||
|
tDebug("http-report process msg, start_seq:%" PRId64 ", end_seq:%" PRId64 ", max_seq:%" PRId64 "", startSeq, endSeq,
|
||||||
|
atomic_load_64(&httpSeqNum) - 1);
|
||||||
|
}
|
||||||
|
|
||||||
static void httpAsyncCb(uv_async_t* handle) {
|
static void httpAsyncCb(uv_async_t* handle) {
|
||||||
SAsyncItem* item = handle->data;
|
SAsyncItem* item = handle->data;
|
||||||
|
@ -290,7 +321,7 @@ static void httpAsyncCb(uv_async_t* handle) {
|
||||||
queue wq;
|
queue wq;
|
||||||
QUEUE_INIT(&wq);
|
QUEUE_INIT(&wq);
|
||||||
|
|
||||||
static int32_t BATCH_SIZE = 5;
|
static int32_t BATCH_SIZE = 20;
|
||||||
int32_t count = 0;
|
int32_t count = 0;
|
||||||
|
|
||||||
(void)taosThreadMutexLock(&item->mtx);
|
(void)taosThreadMutexLock(&item->mtx);
|
||||||
|
@ -303,6 +334,8 @@ static void httpAsyncCb(uv_async_t* handle) {
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexUnlock(&item->mtx);
|
(void)taosThreadMutexUnlock(&item->mtx);
|
||||||
|
|
||||||
|
httpTrace(&wq);
|
||||||
|
|
||||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||||
queue* h = QUEUE_HEAD(&wq);
|
queue* h = QUEUE_HEAD(&wq);
|
||||||
QUEUE_REMOVE(h);
|
QUEUE_REMOVE(h);
|
||||||
|
@ -348,9 +381,9 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const
|
||||||
STUB_RAND_NETWORK_ERR(nread);
|
STUB_RAND_NETWORK_ERR(nread);
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
tError("http-report recv error:%s", uv_strerror(nread));
|
tError("http-report recv error:%s, seq:%" PRId64 "", uv_strerror(nread), cli->seq);
|
||||||
} else {
|
} else {
|
||||||
tTrace("http-report succ to recv %d bytes", (int32_t)nread);
|
tTrace("http-report succ to recv %d bytes, seq:%" PRId64 "", (int32_t)nread, cli->seq);
|
||||||
}
|
}
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
|
@ -360,20 +393,20 @@ static void clientSentCb(uv_write_t* req, int32_t status) {
|
||||||
STUB_RAND_NETWORK_ERR(status);
|
STUB_RAND_NETWORK_ERR(status);
|
||||||
SHttpClient* cli = req->data;
|
SHttpClient* cli = req->data;
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
|
tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 "",
|
||||||
cli->port, cli->chanId);
|
uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq);
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId);
|
tTrace("http-report succ to send data, chanId:%" PRId64 ", seq:%" PRId64 "", cli->chanId, cli->seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
|
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
|
tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 "",
|
||||||
cli->port, cli->chanId);
|
uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq);
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
}
|
}
|
||||||
|
@ -387,9 +420,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
|
||||||
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
|
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
|
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
|
||||||
|
tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 "",
|
||||||
tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status),
|
uv_strerror(status), cli->addr, cli->port, chanId, cli->seq);
|
||||||
cli->addr, cli->port, chanId);
|
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
}
|
}
|
||||||
|
@ -402,8 +434,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
|
||||||
|
|
||||||
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
||||||
if (0 != status) {
|
if (0 != status) {
|
||||||
tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
|
tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 "",
|
||||||
cli->port, chanId);
|
uv_strerror(status), cli->addr, cli->port, chanId, cli->seq);
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
}
|
}
|
||||||
|
@ -416,7 +448,7 @@ int32_t httpSendQuit(SHttpModule* http, int64_t chanId) {
|
||||||
if (msg == NULL) {
|
if (msg == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
msg->seq = atomic_fetch_add_64(&httpSeqNum, 1);
|
||||||
msg->quit = 1;
|
msg->quit = 1;
|
||||||
msg->chanId = chanId;
|
msg->chanId = chanId;
|
||||||
|
|
||||||
|
@ -446,10 +478,11 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
static void httpHandleQuit(SHttpMsg* msg) {
|
static void httpHandleQuit(SHttpMsg* msg) {
|
||||||
|
int64_t seq = msg->seq;
|
||||||
int64_t chanId = msg->chanId;
|
int64_t chanId = msg->chanId;
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
|
|
||||||
tDebug("http-report receive quit, chanId:%" PRId64 "", chanId);
|
tDebug("http-report receive quit, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seq);
|
||||||
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
|
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
|
||||||
if (http == NULL) return;
|
if (http == NULL) return;
|
||||||
uv_walk(http->loop, httpWalkCb, NULL);
|
uv_walk(http->loop, httpWalkCb, NULL);
|
||||||
|
@ -546,7 +579,7 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
cli->seq = msg->seq;
|
||||||
cli->conn.data = cli;
|
cli->conn.data = cli;
|
||||||
cli->tcp.data = cli;
|
cli->tcp.data = cli;
|
||||||
cli->req.data = cli;
|
cli->req.data = cli;
|
||||||
|
@ -560,8 +593,8 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
cli->wbuf = wb;
|
cli->wbuf = wb;
|
||||||
cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
|
cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
|
||||||
if (cli->rbuf == NULL) {
|
if (cli->rbuf == NULL) {
|
||||||
tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port,
|
tError("http-report failed to alloc read buf, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ",reason:%s", cli->addr,
|
||||||
chanId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
cli->port, chanId, cli->seq, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
(void)taosReleaseRef(httpRefMgt, chanId);
|
(void)taosReleaseRef(httpRefMgt, chanId);
|
||||||
return;
|
return;
|
||||||
|
@ -569,8 +602,8 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
|
|
||||||
int err = uv_tcp_init(http->loop, &cli->tcp);
|
int err = uv_tcp_init(http->loop, &cli->tcp);
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
tError("http-report failed to init socket handle, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port,
|
tError("http-report failed to init socket handle, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s",
|
||||||
chanId, uv_strerror(err));
|
cli->addr, cli->port, chanId, cli->seq, uv_strerror(err));
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
(void)taosReleaseRef(httpRefMgt, chanId);
|
(void)taosReleaseRef(httpRefMgt, chanId);
|
||||||
return;
|
return;
|
||||||
|
@ -579,8 +612,8 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
// set up timeout to avoid stuck;
|
// set up timeout to avoid stuck;
|
||||||
int32_t fd = taosCreateSocketWithTimeout(5000);
|
int32_t fd = taosCreateSocketWithTimeout(5000);
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, chanId,
|
tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr,
|
||||||
tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
cli->port, chanId, cli->seq, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
(void)taosReleaseRef(httpRefMgt, chanId);
|
(void)taosReleaseRef(httpRefMgt, chanId);
|
||||||
return;
|
return;
|
||||||
|
@ -588,8 +621,9 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
|
|
||||||
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 ",reason:%s", uv_strerror(ret),
|
tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr,
|
||||||
cli->addr, cli->port, chanId, uv_strerror(ret));
|
cli->port, chanId, cli->seq, uv_strerror(ret));
|
||||||
|
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
(void)taosReleaseRef(httpRefMgt, chanId);
|
(void)taosReleaseRef(httpRefMgt, chanId);
|
||||||
return;
|
return;
|
||||||
|
@ -597,8 +631,8 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
|
|
||||||
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
|
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ",reson:%s", cli->addr, cli->port,
|
tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reson:%s",
|
||||||
chanId, uv_strerror(ret));
|
cli->addr, cli->port, chanId, cli->seq, uv_strerror(ret));
|
||||||
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
|
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
}
|
}
|
||||||
|
@ -607,8 +641,8 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
|
|
||||||
END:
|
END:
|
||||||
if (ignore == false) {
|
if (ignore == false) {
|
||||||
tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId,
|
tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ",seq:%" PRId64 " reason:%s", msg->server,
|
||||||
tstrerror(code));
|
msg->port, chanId, msg->seq, tstrerror(code));
|
||||||
}
|
}
|
||||||
httpDestroyMsg(msg);
|
httpDestroyMsg(msg);
|
||||||
taosMemoryFree(header);
|
taosMemoryFree(header);
|
||||||
|
@ -656,6 +690,7 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri,
|
||||||
code = TSDB_CODE_HTTP_MODULE_QUIT;
|
code = TSDB_CODE_HTTP_MODULE_QUIT;
|
||||||
goto _ERROR;
|
goto _ERROR;
|
||||||
}
|
}
|
||||||
|
tDebug("http-report start to report, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, msg->seq);
|
||||||
|
|
||||||
code = transAsyncSend(load->asyncPool, &(msg->q));
|
code = transAsyncSend(load->asyncPool, &(msg->q));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -665,6 +700,11 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri,
|
||||||
msg = NULL;
|
msg = NULL;
|
||||||
|
|
||||||
_ERROR:
|
_ERROR:
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
tError("http-report failed to report reason:%s, chanId:%" PRId64 ", seq:%" PRId64 "", tstrerror(code), chanId,
|
||||||
|
msg->seq);
|
||||||
|
}
|
||||||
httpDestroyMsg(msg);
|
httpDestroyMsg(msg);
|
||||||
if (load != NULL) taosReleaseRef(httpRefMgt, chanId);
|
if (load != NULL) taosReleaseRef(httpRefMgt, chanId);
|
||||||
return code;
|
return code;
|
||||||
|
@ -688,6 +728,7 @@ int64_t transInitHttpChanImpl();
|
||||||
static void transHttpEnvInit() {
|
static void transHttpEnvInit() {
|
||||||
httpRefMgt = taosOpenRef(64, transHttpDestroyHandle);
|
httpRefMgt = taosOpenRef(64, transHttpDestroyHandle);
|
||||||
httpDefaultChanId = transInitHttpChanImpl();
|
httpDefaultChanId = transInitHttpChanImpl();
|
||||||
|
httpSeqNum = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void transHttpEnvDestroy() {
|
void transHttpEnvDestroy() {
|
||||||
|
@ -752,7 +793,7 @@ int64_t taosInitHttpChan() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosDestroyHttpChan(int64_t chanId) {
|
void taosDestroyHttpChan(int64_t chanId) {
|
||||||
tDebug("http-report send quit, chanId:%" PRId64 "", chanId);
|
tDebug("http-report send quit, chanId: %" PRId64 "", chanId);
|
||||||
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
SHttpModule* load = taosAcquireRef(httpRefMgt, chanId);
|
SHttpModule* load = taosAcquireRef(httpRefMgt, chanId);
|
||||||
|
@ -776,4 +817,4 @@ void taosDestroyHttpChan(int64_t chanId) {
|
||||||
|
|
||||||
(void)taosReleaseRef(httpRefMgt, chanId);
|
(void)taosReleaseRef(httpRefMgt, chanId);
|
||||||
(void)taosRemoveRef(httpRefMgt, chanId);
|
(void)taosRemoveRef(httpRefMgt, chanId);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue