From 30ff2fc11b5b8dc70d7ae4bf93c9ab3d623f9a60 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 19 Aug 2024 17:19:05 +0800 Subject: [PATCH] add trace log --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/thttp.c | 103 ++++++++++++++++++-------- 2 files changed, 72 insertions(+), 32 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 820075787f..e8172385b2 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -240,6 +240,7 @@ typedef struct { void* pThrd; queue qmsg; TdThreadMutex mtx; // protect qmsg; + int64_t num; } SAsyncItem; typedef struct { diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 908468f094..6dee6aee09 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -30,6 +30,8 @@ static int32_t FAST_FAILURE_LIMIT = 1; static int64_t httpDefaultChanId = -1; +static int64_t httpSeqNum = 0; + typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; @@ -50,6 +52,7 @@ typedef struct SHttpMsg { EHttpCompFlag flag; int8_t quit; int64_t chanId; + int64_t seq; } SHttpMsg; typedef struct SHttpClient { @@ -62,6 +65,7 @@ typedef struct SHttpClient { uint16_t port; struct sockaddr_in dest; int64_t chanId; + int64_t seq; } SHttpClient; typedef struct SHttpConnList { @@ -215,14 +219,15 @@ static void* httpThread(void* arg) { static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) { + int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1); if (server == NULL || uri == NULL) { - tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId); + tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum); *httpMsg = NULL; return TSDB_CODE_INVALID_PARA; } if (pCont == NULL || contLen == 0) { - tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId); + tError("http-report failed to report empty packet, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum); *httpMsg = NULL; return TSDB_CODE_INVALID_PARA; } @@ -233,6 +238,7 @@ static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, return TSDB_CODE_OUT_OF_MEMORY; } + msg->seq = seqNum; msg->port = port; msg->server = taosStrdup(server); msg->uri = taosStrdup(uri); @@ -259,7 +265,11 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } -static void httpDestroyMsgWrapper(void* cont, void* param) { httpDestroyMsg((SHttpMsg*)cont); } +static void httpDestroyMsgWrapper(void* cont, void* param) { + SHttpMsg* pMsg = cont; + tWarn("http-report destroy msg, seq: %" PRId64 "", pMsg->seq); + httpDestroyMsg(pMsg); +} static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { SHttpMsg *msg = NULL, *quitMsg = NULL; @@ -272,6 +282,8 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { QUEUE_REMOVE(h); msg = QUEUE_DATA(h, SHttpMsg, q); if (!msg->quit) { + tError("http-report failed to report chanId:%" PRId64 ",seq:%" PRId64 ", reason: %s", msg->chanId, msg->seq, + tstrerror(TSDB_CODE_HTTP_MODULE_QUIT)); httpDestroyMsg(msg); } else { quitMsg = msg; @@ -281,6 +293,25 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { QUEUE_PUSH(&item->qmsg, &quitMsg->q); } } +static void httpTrace(queue* q) { + if (!(rpcDebugFlag & DEBUG_DEBUG) || (QUEUE_IS_EMPTY(q))) { + return; + } + + int64_t startSeq = 0, endSeq = 0; + SHttpMsg* msg = NULL; + + queue* h = QUEUE_HEAD(q); + msg = QUEUE_DATA(h, SHttpMsg, q); + startSeq = msg->seq; + + h = QUEUE_TAIL(q); + msg = QUEUE_DATA(h, SHttpMsg, q); + endSeq = msg->seq; + + tDebug("http-report process msg, start_seq: %" PRId64 ", end_seq: %" PRId64 ", max_seq:%" PRId64 "", startSeq, endSeq, + atomic_load_64(&httpSeqNum) - 1); +} static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; @@ -290,7 +321,7 @@ static void httpAsyncCb(uv_async_t* handle) { queue wq; QUEUE_INIT(&wq); - static int32_t BATCH_SIZE = 5; + static int32_t BATCH_SIZE = 20; int32_t count = 0; (void)taosThreadMutexLock(&item->mtx); @@ -303,6 +334,8 @@ static void httpAsyncCb(uv_async_t* handle) { } (void)taosThreadMutexUnlock(&item->mtx); + httpTrace(&wq); + while (!QUEUE_IS_EMPTY(&wq)) { queue* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); @@ -347,9 +380,9 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { SHttpClient* cli = handle->data; if (nread < 0) { - tError("http-report recv error:%s", uv_strerror(nread)); + tError("http-report recv error:%s, seq: %" PRId64 "", uv_strerror(nread), cli->seq); } else { - tTrace("http-report succ to recv %d bytes", (int32_t)nread); + tTrace("http-report succ to recv %d bytes, seq: %" PRId64 "", (int32_t)nread, cli->seq); } if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); @@ -358,19 +391,19 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const static void clientSentCb(uv_write_t* req, int32_t status) { SHttpClient* cli = req->data; if (status != 0) { - tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, - cli->port, cli->chanId); + tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 ", seq: %" PRId64 "", + uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } return; } else { - tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId); + tTrace("http-report succ to send data, chanId:%" PRId64 ", seq: %" PRId64 "", cli->chanId, cli->seq); } status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); if (status != 0) { - tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, - cli->port, cli->chanId); + tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq: %" PRId64 "", + uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } @@ -383,9 +416,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (status != 0) { httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); - - tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), - cli->addr, cli->port, chanId); + tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq: %" PRId64 "", + uv_strerror(status), cli->addr, cli->port, chanId, cli->seq); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } @@ -398,8 +430,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { - tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, - cli->port, chanId); + tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq: %" PRId64 "", + uv_strerror(status), cli->addr, cli->port, chanId, cli->seq); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } @@ -412,7 +444,7 @@ int32_t httpSendQuit(SHttpModule* http, int64_t chanId) { if (msg == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - + msg->seq = atomic_fetch_add_64(&httpSeqNum, 1); msg->quit = 1; msg->chanId = chanId; @@ -442,10 +474,11 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) { return; } static void httpHandleQuit(SHttpMsg* msg) { + int64_t seq = msg->seq; int64_t chanId = msg->chanId; taosMemoryFree(msg); - tDebug("http-report receive quit, chanId:%" PRId64 "", chanId); + tDebug("http-report receive quit, chanId: %" PRId64 ", seq: %" PRId64 "", chanId, seq); SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (http == NULL) return; uv_walk(http->loop, httpWalkCb, NULL); @@ -542,7 +575,7 @@ static void httpHandleReq(SHttpMsg* msg) { code = TSDB_CODE_OUT_OF_MEMORY; goto END; } - + cli->seq = msg->seq; cli->conn.data = cli; cli->tcp.data = cli; cli->req.data = cli; @@ -556,8 +589,8 @@ static void httpHandleReq(SHttpMsg* msg) { cli->wbuf = wb; cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); if (cli->rbuf == NULL) { - tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, - chanId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", seq:%" PRId64 ",reason:%s", cli->addr, + cli->port, chanId, cli->seq, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); destroyHttpClient(cli); (void)taosReleaseRef(httpRefMgt, chanId); return; @@ -565,8 +598,8 @@ static void httpHandleReq(SHttpMsg* msg) { int err = uv_tcp_init(http->loop, &cli->tcp); if (err != 0) { - tError("http-report failed to init socket handle, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, - chanId, uv_strerror(err)); + tError("http-report failed to init socket handle, dst:%s:%d,chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", + cli->addr, cli->port, chanId, cli->seq, uv_strerror(err)); destroyHttpClient(cli); (void)taosReleaseRef(httpRefMgt, chanId); return; @@ -575,8 +608,8 @@ static void httpHandleReq(SHttpMsg* msg) { // set up timeout to avoid stuck; int32_t fd = taosCreateSocketWithTimeout(5000); if (fd < 0) { - tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, chanId, - tstrerror(TAOS_SYSTEM_ERROR(errno))); + tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr, + cli->port, chanId, cli->seq, tstrerror(TAOS_SYSTEM_ERROR(errno))); destroyHttpClient(cli); (void)taosReleaseRef(httpRefMgt, chanId); return; @@ -584,8 +617,8 @@ static void httpHandleReq(SHttpMsg* msg) { int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { - tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 ",reason:%s", uv_strerror(ret), - cli->addr, cli->port, chanId, uv_strerror(ret)); + tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", + uv_strerror(ret), cli->addr, cli->port, chanId, cli->seq, uv_strerror(ret)); destroyHttpClient(cli); (void)taosReleaseRef(httpRefMgt, chanId); return; @@ -593,8 +626,8 @@ static void httpHandleReq(SHttpMsg* msg) { ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); if (ret != 0) { - tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ",reson:%s", cli->addr, cli->port, - chanId, uv_strerror(ret)); + tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ",reson:%s", + cli->addr, cli->port, chanId, cli->seq, uv_strerror(ret)); httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); destroyHttpClient(cli); } @@ -603,8 +636,8 @@ static void httpHandleReq(SHttpMsg* msg) { END: if (ignore == false) { - tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId, - tstrerror(code)); + tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ",seq:%" PRId64 " reason:%s", msg->server, + msg->port, chanId, msg->seq, tstrerror(code)); } httpDestroyMsg(msg); taosMemoryFree(header); @@ -661,6 +694,11 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, msg = NULL; _ERROR: + + if (code != 0) { + tError("http-report failed to report reason:%s, chanId:%" PRId64 ", seq:%" PRId64 "", tstrerror(code), chanId, + msg->seq); + } httpDestroyMsg(msg); if (load != NULL) taosReleaseRef(httpRefMgt, chanId); return code; @@ -684,6 +722,7 @@ int64_t transInitHttpChanImpl(); static void transHttpEnvInit() { httpRefMgt = taosOpenRef(64, transHttpDestroyHandle); httpDefaultChanId = transInitHttpChanImpl(); + httpSeqNum = 0; } void transHttpEnvDestroy() { @@ -772,4 +811,4 @@ void taosDestroyHttpChan(int64_t chanId) { (void)taosReleaseRef(httpRefMgt, chanId); (void)taosRemoveRef(httpRefMgt, chanId); -} \ No newline at end of file +}