diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 65b0058cfe..7cd043f90d 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -32,6 +32,7 @@ typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; TdThread thread; + SHashObj* connStatusTable; } SHttpModule; typedef struct SHttpMsg { @@ -64,6 +65,8 @@ static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); static int32_t httpSendQuit(); +static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); +static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ); static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); @@ -262,14 +265,20 @@ static void clientSentCb(uv_write_t* req, int32_t status) { } } static void clientConnCb(uv_connect_t* req, int32_t status) { + SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); SHttpClient* cli = req->data; if (status != 0) { + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); + tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } + taosReleaseRef(httpRefMgt, httpRef); return; } + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1); + status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); @@ -277,6 +286,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } } + taosReleaseRef(httpRefMgt, httpRef); } int32_t httpSendQuit() { @@ -349,16 +359,51 @@ static void httpHandleQuit(SHttpMsg* msg) { uv_walk(http->loop, httpWalkCb, NULL); taosReleaseRef(httpRefMgt, httpRef); } + +static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) { + char buf[256] = {0}; + sprintf(buf, "%s:%d", server, port); + + int64_t* failedTime = (int64_t*)taosHashGet(pTable, buf, strlen(buf)); + if (failedTime == NULL) { + return false; + } + + int64_t now = taosGetTimestampSec(); + if (now - *failedTime < 10) { + tError("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf); + return true; + } else { + return false; + } +} +static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ) { + char buf[256] = {0}; + sprintf(buf, "%s:%d", server, port); + + if (succ) { + taosHashRemove(pTable, buf, strlen(buf)); + } else { + int64_t st = taosGetTimestampSec(); + taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st)); + } + return; +} static void httpHandleReq(SHttpMsg* msg) { + int32_t ignore = false; SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); if (http == NULL) { goto END; } - + if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) { + ignore = true; + goto END; + } struct sockaddr_in dest = {0}; if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { goto END; } + if (msg->flag == HTTP_GZIP) { int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len); if (dstLen > 0) { @@ -418,13 +463,16 @@ static void httpHandleReq(SHttpMsg* msg) { if (ret != 0) { tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); destroyHttpClient(cli); } taosReleaseRef(httpRefMgt, httpRef); return; END: - tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + if (ignore == false) { + tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + } httpDestroyMsg(msg); taosReleaseRef(httpRefMgt, httpRef); } @@ -441,6 +489,8 @@ static void transHttpEnvInit() { SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + uv_loop_init(http->loop); http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); @@ -474,6 +524,8 @@ void transHttpEnvDestroy() { uv_loop_close(load->loop); taosMemoryFree(load->loop); + taosHashCleanup(load->connStatusTable); + taosReleaseRef(httpRefMgt, httpRef); taosRemoveRef(httpRefMgt, httpRef); }