diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 65b0058cfe..afb982a50a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -28,10 +28,12 @@ static int32_t httpRefMgt = 0; static int64_t httpRef = -1; +static int32_t FAST_FAILURE_LIMIT = 120; typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; TdThread thread; + SHashObj* connStatusTable; } SHttpModule; typedef struct SHttpMsg { @@ -64,6 +66,8 @@ static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); static int32_t httpSendQuit(); +static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); +static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ); static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); @@ -193,11 +197,20 @@ static void httpAsyncCb(uv_async_t* handle) { SHttpMsg *msg = NULL, *quitMsg = NULL; queue wq; + QUEUE_INIT(&wq); + + static int32_t BATCH_SIZE = 5; + int32_t count = 0; + taosThreadMutexLock(&item->mtx); - QUEUE_MOVE(&item->qmsg, &wq); + + while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) { + queue* h = QUEUE_HEAD(&item->qmsg); + QUEUE_REMOVE(h); + QUEUE_PUSH(&wq, h); + } taosThreadMutexUnlock(&item->mtx); - int count = 0; while (!QUEUE_IS_EMPTY(&wq)) { queue* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); @@ -262,14 +275,20 @@ static void clientSentCb(uv_write_t* req, int32_t status) { } } static void clientConnCb(uv_connect_t* req, int32_t status) { + SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); SHttpClient* cli = req->data; if (status != 0) { + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); + tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } + taosReleaseRef(httpRefMgt, httpRef); return; } + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1); + status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); @@ -277,6 +296,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } } + taosReleaseRef(httpRefMgt, httpRef); } int32_t httpSendQuit() { @@ -349,16 +369,51 @@ static void httpHandleQuit(SHttpMsg* msg) { uv_walk(http->loop, httpWalkCb, NULL); taosReleaseRef(httpRefMgt, httpRef); } + +static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) { + char buf[256] = {0}; + sprintf(buf, "%s:%d", server, port); + + int32_t* failedTime = (int32_t*)taosHashGet(pTable, buf, strlen(buf)); + if (failedTime == NULL) { + return false; + } + + int32_t now = taosGetTimestampSec(); + if (*failedTime > now - FAST_FAILURE_LIMIT) { + tDebug("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf); + return true; + } else { + return false; + } +} +static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ) { + char buf[256] = {0}; + sprintf(buf, "%s:%d", server, port); + + if (succ) { + taosHashRemove(pTable, buf, strlen(buf)); + } else { + int32_t st = taosGetTimestampSec(); + taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st)); + } + return; +} static void httpHandleReq(SHttpMsg* msg) { + int32_t ignore = false; SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); if (http == NULL) { goto END; } - + if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) { + ignore = true; + goto END; + } struct sockaddr_in dest = {0}; if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { goto END; } + if (msg->flag == HTTP_GZIP) { int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len); if (dstLen > 0) { @@ -399,11 +454,11 @@ static void httpHandleReq(SHttpMsg* msg) { uv_tcp_init(http->loop, &cli->tcp); // set up timeout to avoid stuck; - int32_t fd = taosCreateSocketWithTimeout(5); + int32_t fd = taosCreateSocketWithTimeout(5000); if (fd < 0) { tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port); - taosReleaseRef(httpRefMgt, httpRef); destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, httpRef); return; } int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); @@ -418,13 +473,16 @@ static void httpHandleReq(SHttpMsg* msg) { if (ret != 0) { tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); destroyHttpClient(cli); } taosReleaseRef(httpRefMgt, httpRef); return; END: - tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + if (ignore == false) { + tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + } httpDestroyMsg(msg); taosReleaseRef(httpRefMgt, httpRef); } @@ -441,6 +499,8 @@ static void transHttpEnvInit() { SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + uv_loop_init(http->loop); http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); @@ -474,6 +534,8 @@ void transHttpEnvDestroy() { uv_loop_close(load->loop); taosMemoryFree(load->loop); + taosHashCleanup(load->connStatusTable); + taosReleaseRef(httpRefMgt, httpRef); taosRemoveRef(httpRefMgt, httpRef); } diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index b0548149d0..da4cda5dc7 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -2,6 +2,7 @@ add_executable(transportTest "") add_executable(transUT "") add_executable(svrBench "") add_executable(cliBench "") +add_executable(httpBench "") target_sources(transUT PRIVATE @@ -21,6 +22,10 @@ target_sources(cliBench PRIVATE "cliBench.c" ) +target_sources(httpBench + PRIVATE + "http_test.c" +) target_include_directories(transportTest PUBLIC @@ -51,11 +56,6 @@ target_include_directories(transUT "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories(svrBench - PUBLIC - "${TD_SOURCE_DIR}/include/libs/transport" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) target_include_directories(svrBench PUBLIC "${TD_SOURCE_DIR}/include/libs/transport" @@ -75,7 +75,8 @@ target_include_directories(cliBench "${TD_SOURCE_DIR}/include/libs/transport" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories(cliBench + +target_include_directories(httpBench PUBLIC "${TD_SOURCE_DIR}/include/libs/transport" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" @@ -89,6 +90,14 @@ target_link_libraries (cliBench transport ) +target_link_libraries(httpBench + os + util + common + gtest_main + transport +) + add_test( NAME transUT COMMAND transUT diff --git a/source/libs/transport/test/http_test.c b/source/libs/transport/test/http_test.c new file mode 100644 index 0000000000..d04fb84843 --- /dev/null +++ b/source/libs/transport/test/http_test.c @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "os.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thttp.h" +#include "transLog.h" +#include "trpc.h" +#include "tutil.h" +#include "tversion.h" + +void initLogEnv() { + const char * logDir = "/tmp/trans_cli"; + const char * defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 1000000; + tsAsyncLog = 0; + // rpcDebugflag = 143; + strcpy(tsLogDir, (char *)logDir); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); + + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} +typedef struct TThread { + TdThread thread; + int idx; +} TThread; + +void *proces(void *arg) { + char *monitor = "172.26.10.94"; + while (1) { + int32_t len = 512; + char * msg = taosMemoryCalloc(1, len); + memset(msg, 1, len); + int32_t code = taosSendHttpReport(monitor, "/crash", 6050, msg, 10, HTTP_FLAT); + taosMemoryFree(msg); + taosUsleep(10); + } +} +int main(int argc, char *argv[]) { + initLogEnv(); + int32_t numOfThreads = 10; + TThread *thread = taosMemoryCalloc(1, sizeof(TThread) * numOfThreads); + + for (int i = 0; i < numOfThreads; i++) { + thread[i].idx = i; + taosThreadCreate(&(thread[i].thread), NULL, proces, (void *)&thread[i]); + } + while (1) { + taosMsleep(5000); + } + + taosCloseLog(); + + return 0; +}