add trace log

This commit is contained in:
yihaoDeng 2024-08-19 17:19:05 +08:00
parent 215a665497
commit 30ff2fc11b
2 changed files with 72 additions and 32 deletions

View File

@ -240,6 +240,7 @@ typedef struct {
void* pThrd; void* pThrd;
queue qmsg; queue qmsg;
TdThreadMutex mtx; // protect qmsg; TdThreadMutex mtx; // protect qmsg;
int64_t num;
} SAsyncItem; } SAsyncItem;
typedef struct { typedef struct {

View File

@ -30,6 +30,8 @@ static int32_t FAST_FAILURE_LIMIT = 1;
static int64_t httpDefaultChanId = -1; static int64_t httpDefaultChanId = -1;
static int64_t httpSeqNum = 0;
typedef struct SHttpModule { typedef struct SHttpModule {
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
@ -50,6 +52,7 @@ typedef struct SHttpMsg {
EHttpCompFlag flag; EHttpCompFlag flag;
int8_t quit; int8_t quit;
int64_t chanId; int64_t chanId;
int64_t seq;
} SHttpMsg; } SHttpMsg;
typedef struct SHttpClient { typedef struct SHttpClient {
@ -62,6 +65,7 @@ typedef struct SHttpClient {
uint16_t port; uint16_t port;
struct sockaddr_in dest; struct sockaddr_in dest;
int64_t chanId; int64_t chanId;
int64_t seq;
} SHttpClient; } SHttpClient;
typedef struct SHttpConnList { typedef struct SHttpConnList {
@ -215,14 +219,15 @@ static void* httpThread(void* arg) {
static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) { EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) {
int64_t seqNum = atomic_fetch_add_64(&httpSeqNum, 1);
if (server == NULL || uri == NULL) { if (server == NULL || uri == NULL) {
tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId); tError("http-report failed to report to invalid addr, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum);
*httpMsg = NULL; *httpMsg = NULL;
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
if (pCont == NULL || contLen == 0) { if (pCont == NULL || contLen == 0) {
tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId); tError("http-report failed to report empty packet, chanId:%" PRId64 ", seq:%" PRId64 "", chanId, seqNum);
*httpMsg = NULL; *httpMsg = NULL;
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -233,6 +238,7 @@ static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port,
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
msg->seq = seqNum;
msg->port = port; msg->port = port;
msg->server = taosStrdup(server); msg->server = taosStrdup(server);
msg->uri = taosStrdup(uri); msg->uri = taosStrdup(uri);
@ -259,7 +265,11 @@ static void httpDestroyMsg(SHttpMsg* msg) {
taosMemoryFree(msg->cont); taosMemoryFree(msg->cont);
taosMemoryFree(msg); taosMemoryFree(msg);
} }
static void httpDestroyMsgWrapper(void* cont, void* param) { httpDestroyMsg((SHttpMsg*)cont); } static void httpDestroyMsgWrapper(void* cont, void* param) {
SHttpMsg* pMsg = cont;
tWarn("http-report destroy msg, seq: %" PRId64 "", pMsg->seq);
httpDestroyMsg(pMsg);
}
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
SHttpMsg *msg = NULL, *quitMsg = NULL; SHttpMsg *msg = NULL, *quitMsg = NULL;
@ -272,6 +282,8 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
msg = QUEUE_DATA(h, SHttpMsg, q); msg = QUEUE_DATA(h, SHttpMsg, q);
if (!msg->quit) { if (!msg->quit) {
tError("http-report failed to report chanId:%" PRId64 ",seq:%" PRId64 ", reason: %s", msg->chanId, msg->seq,
tstrerror(TSDB_CODE_HTTP_MODULE_QUIT));
httpDestroyMsg(msg); httpDestroyMsg(msg);
} else { } else {
quitMsg = msg; quitMsg = msg;
@ -281,6 +293,25 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
QUEUE_PUSH(&item->qmsg, &quitMsg->q); QUEUE_PUSH(&item->qmsg, &quitMsg->q);
} }
} }
static void httpTrace(queue* q) {
if (!(rpcDebugFlag & DEBUG_DEBUG) || (QUEUE_IS_EMPTY(q))) {
return;
}
int64_t startSeq = 0, endSeq = 0;
SHttpMsg* msg = NULL;
queue* h = QUEUE_HEAD(q);
msg = QUEUE_DATA(h, SHttpMsg, q);
startSeq = msg->seq;
h = QUEUE_TAIL(q);
msg = QUEUE_DATA(h, SHttpMsg, q);
endSeq = msg->seq;
tDebug("http-report process msg, start_seq: %" PRId64 ", end_seq: %" PRId64 ", max_seq:%" PRId64 "", startSeq, endSeq,
atomic_load_64(&httpSeqNum) - 1);
}
static void httpAsyncCb(uv_async_t* handle) { static void httpAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data; SAsyncItem* item = handle->data;
@ -290,7 +321,7 @@ static void httpAsyncCb(uv_async_t* handle) {
queue wq; queue wq;
QUEUE_INIT(&wq); QUEUE_INIT(&wq);
static int32_t BATCH_SIZE = 5; static int32_t BATCH_SIZE = 20;
int32_t count = 0; int32_t count = 0;
(void)taosThreadMutexLock(&item->mtx); (void)taosThreadMutexLock(&item->mtx);
@ -303,6 +334,8 @@ static void httpAsyncCb(uv_async_t* handle) {
} }
(void)taosThreadMutexUnlock(&item->mtx); (void)taosThreadMutexUnlock(&item->mtx);
httpTrace(&wq);
while (!QUEUE_IS_EMPTY(&wq)) { while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq); queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
@ -347,9 +380,9 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
SHttpClient* cli = handle->data; SHttpClient* cli = handle->data;
if (nread < 0) { if (nread < 0) {
tError("http-report recv error:%s", uv_strerror(nread)); tError("http-report recv error:%s, seq: %" PRId64 "", uv_strerror(nread), cli->seq);
} else { } else {
tTrace("http-report succ to recv %d bytes", (int32_t)nread); tTrace("http-report succ to recv %d bytes, seq: %" PRId64 "", (int32_t)nread, cli->seq);
} }
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
@ -358,19 +391,19 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const
static void clientSentCb(uv_write_t* req, int32_t status) { static void clientSentCb(uv_write_t* req, int32_t status) {
SHttpClient* cli = req->data; SHttpClient* cli = req->data;
if (status != 0) { if (status != 0) {
tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 ", seq: %" PRId64 "",
cli->port, cli->chanId); uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} }
return; return;
} else { } else {
tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId); tTrace("http-report succ to send data, chanId:%" PRId64 ", seq: %" PRId64 "", cli->chanId, cli->seq);
} }
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
if (status != 0) { if (status != 0) {
tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq: %" PRId64 "",
cli->port, cli->chanId); uv_strerror(status), cli->addr, cli->port, cli->chanId, cli->seq);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} }
@ -383,9 +416,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
if (status != 0) { if (status != 0) {
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq: %" PRId64 "",
tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), uv_strerror(status), cli->addr, cli->port, chanId, cli->seq);
cli->addr, cli->port, chanId);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} }
@ -398,8 +430,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) {
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
if (0 != status) { if (0 != status) {
tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq: %" PRId64 "",
cli->port, chanId); uv_strerror(status), cli->addr, cli->port, chanId, cli->seq);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
} }
@ -412,7 +444,7 @@ int32_t httpSendQuit(SHttpModule* http, int64_t chanId) {
if (msg == NULL) { if (msg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
msg->seq = atomic_fetch_add_64(&httpSeqNum, 1);
msg->quit = 1; msg->quit = 1;
msg->chanId = chanId; msg->chanId = chanId;
@ -442,10 +474,11 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) {
return; return;
} }
static void httpHandleQuit(SHttpMsg* msg) { static void httpHandleQuit(SHttpMsg* msg) {
int64_t seq = msg->seq;
int64_t chanId = msg->chanId; int64_t chanId = msg->chanId;
taosMemoryFree(msg); taosMemoryFree(msg);
tDebug("http-report receive quit, chanId:%" PRId64 "", chanId); tDebug("http-report receive quit, chanId: %" PRId64 ", seq: %" PRId64 "", chanId, seq);
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
if (http == NULL) return; if (http == NULL) return;
uv_walk(http->loop, httpWalkCb, NULL); uv_walk(http->loop, httpWalkCb, NULL);
@ -542,7 +575,7 @@ static void httpHandleReq(SHttpMsg* msg) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto END; goto END;
} }
cli->seq = msg->seq;
cli->conn.data = cli; cli->conn.data = cli;
cli->tcp.data = cli; cli->tcp.data = cli;
cli->req.data = cli; cli->req.data = cli;
@ -556,8 +589,8 @@ static void httpHandleReq(SHttpMsg* msg) {
cli->wbuf = wb; cli->wbuf = wb;
cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
if (cli->rbuf == NULL) { if (cli->rbuf == NULL) {
tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", seq:%" PRId64 ",reason:%s", cli->addr,
chanId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); cli->port, chanId, cli->seq, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
destroyHttpClient(cli); destroyHttpClient(cli);
(void)taosReleaseRef(httpRefMgt, chanId); (void)taosReleaseRef(httpRefMgt, chanId);
return; return;
@ -565,8 +598,8 @@ static void httpHandleReq(SHttpMsg* msg) {
int err = uv_tcp_init(http->loop, &cli->tcp); int err = uv_tcp_init(http->loop, &cli->tcp);
if (err != 0) { if (err != 0) {
tError("http-report failed to init socket handle, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, tError("http-report failed to init socket handle, dst:%s:%d,chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s",
chanId, uv_strerror(err)); cli->addr, cli->port, chanId, cli->seq, uv_strerror(err));
destroyHttpClient(cli); destroyHttpClient(cli);
(void)taosReleaseRef(httpRefMgt, chanId); (void)taosReleaseRef(httpRefMgt, chanId);
return; return;
@ -575,8 +608,8 @@ static void httpHandleReq(SHttpMsg* msg) {
// set up timeout to avoid stuck; // set up timeout to avoid stuck;
int32_t fd = taosCreateSocketWithTimeout(5000); int32_t fd = taosCreateSocketWithTimeout(5000);
if (fd < 0) { if (fd < 0) {
tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, chanId, tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s", cli->addr,
tstrerror(TAOS_SYSTEM_ERROR(errno))); cli->port, chanId, cli->seq, tstrerror(TAOS_SYSTEM_ERROR(errno)));
destroyHttpClient(cli); destroyHttpClient(cli);
(void)taosReleaseRef(httpRefMgt, chanId); (void)taosReleaseRef(httpRefMgt, chanId);
return; return;
@ -584,8 +617,8 @@ static void httpHandleReq(SHttpMsg* msg) {
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
if (ret != 0) { if (ret != 0) {
tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 ",reason:%s", uv_strerror(ret), tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ", reason:%s",
cli->addr, cli->port, chanId, uv_strerror(ret)); uv_strerror(ret), cli->addr, cli->port, chanId, cli->seq, uv_strerror(ret));
destroyHttpClient(cli); destroyHttpClient(cli);
(void)taosReleaseRef(httpRefMgt, chanId); (void)taosReleaseRef(httpRefMgt, chanId);
return; return;
@ -593,8 +626,8 @@ static void httpHandleReq(SHttpMsg* msg) {
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
if (ret != 0) { if (ret != 0) {
tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ",reson:%s", cli->addr, cli->port, tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ", seq:%" PRId64 ",reson:%s",
chanId, uv_strerror(ret)); cli->addr, cli->port, chanId, cli->seq, uv_strerror(ret));
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
destroyHttpClient(cli); destroyHttpClient(cli);
} }
@ -603,8 +636,8 @@ static void httpHandleReq(SHttpMsg* msg) {
END: END:
if (ignore == false) { if (ignore == false) {
tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId, tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ",seq:%" PRId64 " reason:%s", msg->server,
tstrerror(code)); msg->port, chanId, msg->seq, tstrerror(code));
} }
httpDestroyMsg(msg); httpDestroyMsg(msg);
taosMemoryFree(header); taosMemoryFree(header);
@ -661,6 +694,11 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri,
msg = NULL; msg = NULL;
_ERROR: _ERROR:
if (code != 0) {
tError("http-report failed to report reason:%s, chanId:%" PRId64 ", seq:%" PRId64 "", tstrerror(code), chanId,
msg->seq);
}
httpDestroyMsg(msg); httpDestroyMsg(msg);
if (load != NULL) taosReleaseRef(httpRefMgt, chanId); if (load != NULL) taosReleaseRef(httpRefMgt, chanId);
return code; return code;
@ -684,6 +722,7 @@ int64_t transInitHttpChanImpl();
static void transHttpEnvInit() { static void transHttpEnvInit() {
httpRefMgt = taosOpenRef(64, transHttpDestroyHandle); httpRefMgt = taosOpenRef(64, transHttpDestroyHandle);
httpDefaultChanId = transInitHttpChanImpl(); httpDefaultChanId = transInitHttpChanImpl();
httpSeqNum = 0;
} }
void transHttpEnvDestroy() { void transHttpEnvDestroy() {
@ -772,4 +811,4 @@ void taosDestroyHttpChan(int64_t chanId) {
(void)taosReleaseRef(httpRefMgt, chanId); (void)taosReleaseRef(httpRefMgt, chanId);
(void)taosRemoveRef(httpRefMgt, chanId); (void)taosRemoveRef(httpRefMgt, chanId);
} }