diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 820075787f..e8172385b2 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -240,6 +240,7 @@ typedef struct { void* pThrd; queue qmsg; TdThreadMutex mtx; // protect qmsg; + int64_t num; } SAsyncItem; typedef struct { diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index e517b8a0bc..6096f69ca2 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -30,6 +30,8 @@ static int32_t FAST_FAILURE_LIMIT = 1; static int64_t httpDefaultChanId = -1; +static int64_t httpSeqNum = 0; + typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; @@ -50,6 +52,7 @@ typedef struct SHttpMsg { EHttpCompFlag flag; int8_t quit; int64_t chanId; + int64_t seq; } SHttpMsg; typedef struct SHttpClient { @@ -62,6 +65,7 @@ typedef struct SHttpClient { uint16_t port; struct sockaddr_in dest; int64_t chanId; + int64_t seq; } SHttpClient; typedef struct SHttpConnList { @@ -193,14 +197,14 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, uint32_t ip = 0; int32_t code = taosGetIpv4FromFqdn(server, &ip); if (code) { - tError("http-report failed to resolving domain names: %s", server); + tError("http-report failed to resolving domain names %s, reason: %s", server, tstrerror(code)); return TSDB_CODE_RPC_FQDN_ERROR; } char buf[256] = {0}; tinet_ntoa(buf, ip); int ret = uv_ip4_addr(buf, port, dest); if (ret != 0) { - tError("http-report failed to get addr %s", uv_err_name(ret)); + tError("http-report failed to get addr, reason:%s", uv_err_name(ret)); return TSDB_CODE_THIRDPARTY_ERROR; } return 0; @@ -215,14 +219,15 @@ static void* httpThread(void* arg) { static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) { + int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1); if (server == NULL || uri == NULL) { - tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId); + tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum); *httpMsg = NULL; return TSDB_CODE_INVALID_PARA; } if (pCont == NULL || contLen == 0) { - tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId); + tError("http-report failed to report empty packet, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum); *httpMsg = NULL; return TSDB_CODE_INVALID_PARA; } @@ -233,6 +238,7 @@ static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, return TSDB_CODE_OUT_OF_MEMORY; } + msg->seq = seqNum; msg->port = port; msg->server = taosStrdup(server); msg->uri = taosStrdup(uri); @@ -259,7 +265,11 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } -static void httpDestroyMsgWrapper(void* cont, void* param) { httpDestroyMsg((SHttpMsg*)cont); } +static void httpDestroyMsgWrapper(void* cont, void* param) { + SHttpMsg* pMsg = cont; + tWarn("http-report destroy msg, chanId:%" PRId64 ", seq:%" PRId64 "", pMsg->chanId, pMsg->seq); + httpDestroyMsg(pMsg); +} static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { SHttpMsg *msg = NULL, *quitMsg = NULL; @@ -272,6 +282,8 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { QUEUE_REMOVE(h); msg = QUEUE_DATA(h, SHttpMsg, q); if (!msg->quit) { + tError("http-report failed to report chanId:%" PRId64 ",seq:%" PRId64 ", reason: %s", msg->chanId, msg->seq, + tstrerror(TSDB_CODE_HTTP_MODULE_QUIT)); httpDestroyMsg(msg); } else { quitMsg = msg; @@ -281,6 +293,25 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { QUEUE_PUSH(&item->qmsg, &quitMsg->q); } } +static void httpTrace(queue* q) { + if (!(rpcDebugFlag & DEBUG_DEBUG) || (QUEUE_IS_EMPTY(q))) { + return; + } + + int64_t startSeq = 0, endSeq = 0; + SHttpMsg* msg = NULL; + + queue* h = QUEUE_HEAD(q); + msg = QUEUE_DATA(h, SHttpMsg, q); + startSeq = msg->seq; + + h = QUEUE_TAIL(q); + msg = QUEUE_DATA(h, SHttpMsg, q); + endSeq = msg->seq; + + tDebug("http-report process msg, start_seq:%" PRId64 ", end_seq:%" PRId64 ", max_seq:%" PRId64 "", startSeq, endSeq, + atomic_load_64(&httpSeqNum) - 1); +} static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; @@ -290,7 +321,7 @@ static void httpAsyncCb(uv_async_t* handle) { queue wq; QUEUE_INIT(&wq); - static int32_t BATCH_SIZE = 5; + static int32_t BATCH_SIZE = 20; int32_t count = 0; (void)taosThreadMutexLock(&item->mtx); @@ -303,6 +334,8 @@ static void httpAsyncCb(uv_async_t* handle) { } (void)taosThreadMutexUnlock(&item->mtx); + httpTrace(&wq); + while (!QUEUE_IS_EMPTY(&wq)) { queue* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); @@ -348,9 +381,9 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const STUB_RAND_NETWORK_ERR(nread); SHttpClient* cli = handle->data; if (nread < 0) { - tError("http-report recv error:%s", uv_strerror(nread)); + tError("http-report recv error:%s, seq:%" PRId64 "", uv_strerror(nread), cli->seq); } else { - tTrace("http-report succ to recv %d bytes", (int32_t)nread); + tTrace("http-report succ to recv %d bytes, seq:%" PRId64 "", (int32_t)nread, cli->seq); } if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); @@ -360,20 +393,20 @@ static void clientSentCb(uv_write_t* req, int32_t status) { STUB_RAND_NETWORK_ERR(status); SHttpClient* cli = req->data; if (status != 0) { - tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, - cli->port, cli->chanId); + tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 "", + uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } return; } else { - tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId); + tTrace("http-report succ to send data, chanId:%" PRId64 ", seq:%" PRId64 "", cli->chanId, cli->seq); } status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); if (status != 0) { - tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, - cli->port, cli->chanId); + tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 "", + uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } @@ -387,9 +420,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (status != 0) { httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); - - tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), - cli->addr, cli->port, chanId); + tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 "", + uv_strerror(status), cli->addr, cli->port, chanId, cli->seq); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } @@ -402,8 +434,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { - tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, - cli->port, chanId); + tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 "", + uv_strerror(status), cli->addr, cli->port, chanId, cli->seq); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } @@ -416,7 +448,7 @@ int32_t httpSendQuit(SHttpModule* http, int64_t chanId) { if (msg == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - + msg->seq = atomic_fetch_add_64(&httpSeqNum, 1); msg->quit = 1; msg->chanId = chanId; @@ -446,10 +478,11 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) { return; } static void httpHandleQuit(SHttpMsg* msg) { + int64_t seq = msg->seq; int64_t chanId = msg->chanId; taosMemoryFree(msg); - tDebug("http-report receive quit, chanId:%" PRId64 "", chanId); + tDebug("http-report receive quit, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seq); SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (http == NULL) return; uv_walk(http->loop, httpWalkCb, NULL); @@ -546,7 +579,7 @@ static void httpHandleReq(SHttpMsg* msg) { code = TSDB_CODE_OUT_OF_MEMORY; goto END; } - + cli->seq = msg->seq; cli->conn.data = cli; cli->tcp.data = cli; cli->req.data = cli; @@ -560,8 +593,8 @@ static void httpHandleReq(SHttpMsg* msg) { cli->wbuf = wb; cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); if (cli->rbuf == NULL) { - tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, - chanId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + tError("http-report failed to alloc read buf, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ",reason:%s", cli->addr, + cli->port, chanId, cli->seq, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); destroyHttpClient(cli); (void)taosReleaseRef(httpRefMgt, chanId); return; @@ -569,8 +602,8 @@ static void httpHandleReq(SHttpMsg* msg) { int err = uv_tcp_init(http->loop, &cli->tcp); if (err != 0) { - tError("http-report failed to init socket handle, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, - chanId, uv_strerror(err)); + tError("http-report failed to init socket handle, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", + cli->addr, cli->port, chanId, cli->seq, uv_strerror(err)); destroyHttpClient(cli); (void)taosReleaseRef(httpRefMgt, chanId); return; @@ -579,8 +612,8 @@ static void httpHandleReq(SHttpMsg* msg) { // set up timeout to avoid stuck; int32_t fd = taosCreateSocketWithTimeout(5000); if (fd < 0) { - tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, chanId, - tstrerror(TAOS_SYSTEM_ERROR(errno))); + tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr, + cli->port, chanId, cli->seq, tstrerror(TAOS_SYSTEM_ERROR(errno))); destroyHttpClient(cli); (void)taosReleaseRef(httpRefMgt, chanId); return; @@ -588,8 +621,9 @@ static void httpHandleReq(SHttpMsg* msg) { int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { - tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 ",reason:%s", uv_strerror(ret), - cli->addr, cli->port, chanId, uv_strerror(ret)); + tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr, + cli->port, chanId, cli->seq, uv_strerror(ret)); + destroyHttpClient(cli); (void)taosReleaseRef(httpRefMgt, chanId); return; @@ -597,8 +631,8 @@ static void httpHandleReq(SHttpMsg* msg) { ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); if (ret != 0) { - tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ",reson:%s", cli->addr, cli->port, - chanId, uv_strerror(ret)); + tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reson:%s", + cli->addr, cli->port, chanId, cli->seq, uv_strerror(ret)); httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); destroyHttpClient(cli); } @@ -607,8 +641,8 @@ static void httpHandleReq(SHttpMsg* msg) { END: if (ignore == false) { - tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId, - tstrerror(code)); + tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ",seq:%" PRId64 " reason:%s", msg->server, + msg->port, chanId, msg->seq, tstrerror(code)); } httpDestroyMsg(msg); taosMemoryFree(header); @@ -656,6 +690,7 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, code = TSDB_CODE_HTTP_MODULE_QUIT; goto _ERROR; } + tDebug("http-report start to report, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, msg->seq); code = transAsyncSend(load->asyncPool, &(msg->q)); if (code != 0) { @@ -665,6 +700,11 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, msg = NULL; _ERROR: + + if (code != 0) { + tError("http-report failed to report reason:%s, chanId:%" PRId64 ", seq:%" PRId64 "", tstrerror(code), chanId, + msg->seq); + } httpDestroyMsg(msg); if (load != NULL) taosReleaseRef(httpRefMgt, chanId); return code; @@ -688,6 +728,7 @@ int64_t transInitHttpChanImpl(); static void transHttpEnvInit() { httpRefMgt = taosOpenRef(64, transHttpDestroyHandle); httpDefaultChanId = transInitHttpChanImpl(); + httpSeqNum = 0; } void transHttpEnvDestroy() { @@ -752,7 +793,7 @@ int64_t taosInitHttpChan() { } void taosDestroyHttpChan(int64_t chanId) { - tDebug("http-report send quit, chanId:%" PRId64 "", chanId); + tDebug("http-report send quit, chanId: %" PRId64 "", chanId); int ret = 0; SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); @@ -776,4 +817,4 @@ void taosDestroyHttpChan(int64_t chanId) { (void)taosReleaseRef(httpRefMgt, chanId); (void)taosRemoveRef(httpRefMgt, chanId); -} \ No newline at end of file +}