From aaa202f27b23cb783d9c2c2b25a49f27c8970ac6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 22 Nov 2023 17:58:32 +0800 Subject: [PATCH 1/6] add http test --- source/libs/transport/test/CMakeLists.txt | 21 +++++-- source/libs/transport/test/http_test.c | 70 +++++++++++++++++++++++ 2 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 source/libs/transport/test/http_test.c diff --git a/source/libs/transport/test/CMakeLists.txt b/source/libs/transport/test/CMakeLists.txt index b0548149d0..da4cda5dc7 100644 --- a/source/libs/transport/test/CMakeLists.txt +++ b/source/libs/transport/test/CMakeLists.txt @@ -2,6 +2,7 @@ add_executable(transportTest "") add_executable(transUT "") add_executable(svrBench "") add_executable(cliBench "") +add_executable(httpBench "") target_sources(transUT PRIVATE @@ -21,6 +22,10 @@ target_sources(cliBench PRIVATE "cliBench.c" ) +target_sources(httpBench + PRIVATE + "http_test.c" +) target_include_directories(transportTest PUBLIC @@ -51,11 +56,6 @@ target_include_directories(transUT "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories(svrBench - PUBLIC - "${TD_SOURCE_DIR}/include/libs/transport" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) target_include_directories(svrBench PUBLIC "${TD_SOURCE_DIR}/include/libs/transport" @@ -75,7 +75,8 @@ target_include_directories(cliBench "${TD_SOURCE_DIR}/include/libs/transport" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories(cliBench + +target_include_directories(httpBench PUBLIC "${TD_SOURCE_DIR}/include/libs/transport" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" @@ -89,6 +90,14 @@ target_link_libraries (cliBench transport ) +target_link_libraries(httpBench + os + util + common + gtest_main + transport +) + add_test( NAME transUT COMMAND transUT diff --git a/source/libs/transport/test/http_test.c b/source/libs/transport/test/http_test.c new file mode 100644 index 0000000000..d3b9356a9a --- /dev/null +++ b/source/libs/transport/test/http_test.c @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "os.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thttp.h" +#include "transLog.h" +#include "trpc.h" +#include "tutil.h" +#include "tversion.h" + +void initLogEnv() { + const char * logDir = "/tmp/trans_cli"; + const char * defaultLogFileNamePrefix = "taoslog"; + const int32_t maxLogFileNum = 10000; + tsAsyncLog = 0; + // idxDebugFlag = 143; + strcpy(tsLogDir, (char *)logDir); + taosRemoveDir(tsLogDir); + taosMkDir(tsLogDir); + + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + printf("failed to open log file in directory:%s\n", tsLogDir); + } +} +typedef struct TThread { + TdThread thread; + int idx; +} TThread; + +void *proces(void *arg) { + char *monitor = "172.26.10.94"; + while (1) { + int32_t len = 512; + char * msg = taosMemoryCalloc(1, len); + memset(msg, 1, len); + int32_t code = taosSendHttpReport(monitor, "/crash", 6050, msg, 10, HTTP_FLAT); + taosMemoryFree(msg); + taosUsleep(100 * 10); + } +} +int main(int argc, char *argv[]) { + initLogEnv(); + int32_t numOfThreads = 16; + TThread *thread = taosMemoryCalloc(1, sizeof(TThread) * numOfThreads); + + for (int i = 0; i < numOfThreads; i++) { + thread[i].idx = i; + taosThreadCreate(&(thread[i].thread), NULL, proces, (void *)&thread[i]); + } + while (1) { + taosMsleep(5000); + } + + taosCloseLog(); + + return 0; +} From 40f6de4fc889247907babd7d5403153caf1d896c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 22 Nov 2023 20:23:27 +0800 Subject: [PATCH 2/6] add http fail fast --- source/libs/transport/src/thttp.c | 56 +++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 65b0058cfe..7cd043f90d 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -32,6 +32,7 @@ typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; TdThread thread; + SHashObj* connStatusTable; } SHttpModule; typedef struct SHttpMsg { @@ -64,6 +65,8 @@ static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); static int32_t httpSendQuit(); +static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); +static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ); static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); @@ -262,14 +265,20 @@ static void clientSentCb(uv_write_t* req, int32_t status) { } } static void clientConnCb(uv_connect_t* req, int32_t status) { + SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); SHttpClient* cli = req->data; if (status != 0) { + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); + tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } + taosReleaseRef(httpRefMgt, httpRef); return; } + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1); + status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); @@ -277,6 +286,7 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } } + taosReleaseRef(httpRefMgt, httpRef); } int32_t httpSendQuit() { @@ -349,16 +359,51 @@ static void httpHandleQuit(SHttpMsg* msg) { uv_walk(http->loop, httpWalkCb, NULL); taosReleaseRef(httpRefMgt, httpRef); } + +static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) { + char buf[256] = {0}; + sprintf(buf, "%s:%d", server, port); + + int64_t* failedTime = (int64_t*)taosHashGet(pTable, buf, strlen(buf)); + if (failedTime == NULL) { + return false; + } + + int64_t now = taosGetTimestampSec(); + if (now - *failedTime < 10) { + tError("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf); + return true; + } else { + return false; + } +} +static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ) { + char buf[256] = {0}; + sprintf(buf, "%s:%d", server, port); + + if (succ) { + taosHashRemove(pTable, buf, strlen(buf)); + } else { + int64_t st = taosGetTimestampSec(); + taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st)); + } + return; +} static void httpHandleReq(SHttpMsg* msg) { + int32_t ignore = false; SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); if (http == NULL) { goto END; } - + if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) { + ignore = true; + goto END; + } struct sockaddr_in dest = {0}; if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { goto END; } + if (msg->flag == HTTP_GZIP) { int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len); if (dstLen > 0) { @@ -418,13 +463,16 @@ static void httpHandleReq(SHttpMsg* msg) { if (ret != 0) { tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); destroyHttpClient(cli); } taosReleaseRef(httpRefMgt, httpRef); return; END: - tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + if (ignore == false) { + tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + } httpDestroyMsg(msg); taosReleaseRef(httpRefMgt, httpRef); } @@ -441,6 +489,8 @@ static void transHttpEnvInit() { SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + uv_loop_init(http->loop); http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); @@ -474,6 +524,8 @@ void transHttpEnvDestroy() { uv_loop_close(load->loop); taosMemoryFree(load->loop); + taosHashCleanup(load->connStatusTable); + taosReleaseRef(httpRefMgt, httpRef); taosRemoveRef(httpRefMgt, httpRef); } From 557db1c3cc68e0a627f773ca2fcc6cdab6a0b0c9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 22 Nov 2023 20:24:08 +0800 Subject: [PATCH 3/6] add http test --- source/libs/transport/test/http_test.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/test/http_test.c b/source/libs/transport/test/http_test.c index d3b9356a9a..7bdcdb7ca5 100644 --- a/source/libs/transport/test/http_test.c +++ b/source/libs/transport/test/http_test.c @@ -26,7 +26,7 @@ void initLogEnv() { const char * defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10000; tsAsyncLog = 0; - // idxDebugFlag = 143; + // rpcDebugflag = 143; strcpy(tsLogDir, (char *)logDir); taosRemoveDir(tsLogDir); taosMkDir(tsLogDir); @@ -48,12 +48,12 @@ void *proces(void *arg) { memset(msg, 1, len); int32_t code = taosSendHttpReport(monitor, "/crash", 6050, msg, 10, HTTP_FLAT); taosMemoryFree(msg); - taosUsleep(100 * 10); + taosUsleep(10); } } int main(int argc, char *argv[]) { initLogEnv(); - int32_t numOfThreads = 16; + int32_t numOfThreads = 3; TThread *thread = taosMemoryCalloc(1, sizeof(TThread) * numOfThreads); for (int i = 0; i < numOfThreads; i++) { From 321ef1c7581e785a9ffbc470cb1018b844cb23d1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Nov 2023 11:08:23 +0800 Subject: [PATCH 4/6] add http test --- source/libs/transport/src/thttp.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 7cd043f90d..2507447784 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -196,11 +196,19 @@ static void httpAsyncCb(uv_async_t* handle) { SHttpMsg *msg = NULL, *quitMsg = NULL; queue wq; + QUEUE_INIT(&wq); + + static int32_t BACTHSIZE = 20; + int32_t count = 0; + taosThreadMutexLock(&item->mtx); - QUEUE_MOVE(&item->qmsg, &wq); + while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BACTHSIZE) { + queue* h = QUEUE_HEAD(&item->qmsg); + QUEUE_REMOVE(h); + QUEUE_PUSH(&wq, h); + } taosThreadMutexUnlock(&item->mtx); - int count = 0; while (!QUEUE_IS_EMPTY(&wq)) { queue* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); @@ -370,7 +378,7 @@ static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t p } int64_t now = taosGetTimestampSec(); - if (now - *failedTime < 10) { + if (now - *failedTime < 60) { tError("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf); return true; } else { From 50922427ffac829cd260d616cb5bfaede4bfc98e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Nov 2023 13:59:42 +0800 Subject: [PATCH 5/6] add http test --- source/libs/transport/src/thttp.c | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 2507447784..523dc2a5f7 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -198,7 +198,7 @@ static void httpAsyncCb(uv_async_t* handle) { queue wq; QUEUE_INIT(&wq); - static int32_t BACTHSIZE = 20; + static int32_t BACTHSIZE = 5; int32_t count = 0; taosThreadMutexLock(&item->mtx); @@ -372,13 +372,14 @@ static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t p char buf[256] = {0}; sprintf(buf, "%s:%d", server, port); - int64_t* failedTime = (int64_t*)taosHashGet(pTable, buf, strlen(buf)); + int32_t* failedTime = (int32_t*)taosHashGet(pTable, buf, strlen(buf)); if (failedTime == NULL) { return false; } + int32_t now = taosGetTimestampSec(); - int64_t now = taosGetTimestampSec(); - if (now - *failedTime < 60) { + tError("failed timestamp %d, curr timestamp:%d", *failedTime, now); + if (*failedTime > now - 60) { tError("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf); return true; } else { @@ -392,7 +393,7 @@ static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, if (succ) { taosHashRemove(pTable, buf, strlen(buf)); } else { - int64_t st = taosGetTimestampSec(); + int32_t st = taosGetTimestampSec(); taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st)); } return; @@ -452,11 +453,11 @@ static void httpHandleReq(SHttpMsg* msg) { uv_tcp_init(http->loop, &cli->tcp); // set up timeout to avoid stuck; - int32_t fd = taosCreateSocketWithTimeout(5); + int32_t fd = taosCreateSocketWithTimeout(3000); if (fd < 0) { tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port); - taosReleaseRef(httpRefMgt, httpRef); destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, httpRef); return; } int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); From b22bc3fe6277d1dd44d91838d56637dd23bd7052 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Nov 2023 15:19:48 +0800 Subject: [PATCH 6/6] add http test --- source/libs/transport/src/thttp.c | 15 ++++++++------- source/libs/transport/test/http_test.c | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 523dc2a5f7..afb982a50a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -28,6 +28,7 @@ static int32_t httpRefMgt = 0; static int64_t httpRef = -1; +static int32_t FAST_FAILURE_LIMIT = 120; typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; @@ -198,11 +199,12 @@ static void httpAsyncCb(uv_async_t* handle) { queue wq; QUEUE_INIT(&wq); - static int32_t BACTHSIZE = 5; + static int32_t BATCH_SIZE = 5; int32_t count = 0; taosThreadMutexLock(&item->mtx); - while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BACTHSIZE) { + + while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) { queue* h = QUEUE_HEAD(&item->qmsg); QUEUE_REMOVE(h); QUEUE_PUSH(&wq, h); @@ -376,11 +378,10 @@ static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t p if (failedTime == NULL) { return false; } - int32_t now = taosGetTimestampSec(); - tError("failed timestamp %d, curr timestamp:%d", *failedTime, now); - if (*failedTime > now - 60) { - tError("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf); + int32_t now = taosGetTimestampSec(); + if (*failedTime > now - FAST_FAILURE_LIMIT) { + tDebug("http-report succ to ignore msg,reason:connection timed out, dst:%s", buf); return true; } else { return false; @@ -453,7 +454,7 @@ static void httpHandleReq(SHttpMsg* msg) { uv_tcp_init(http->loop, &cli->tcp); // set up timeout to avoid stuck; - int32_t fd = taosCreateSocketWithTimeout(3000); + int32_t fd = taosCreateSocketWithTimeout(5000); if (fd < 0) { tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port); destroyHttpClient(cli); diff --git a/source/libs/transport/test/http_test.c b/source/libs/transport/test/http_test.c index 7bdcdb7ca5..d04fb84843 100644 --- a/source/libs/transport/test/http_test.c +++ b/source/libs/transport/test/http_test.c @@ -24,7 +24,7 @@ void initLogEnv() { const char * logDir = "/tmp/trans_cli"; const char * defaultLogFileNamePrefix = "taoslog"; - const int32_t maxLogFileNum = 10000; + const int32_t maxLogFileNum = 1000000; tsAsyncLog = 0; // rpcDebugflag = 143; strcpy(tsLogDir, (char *)logDir); @@ -53,7 +53,7 @@ void *proces(void *arg) { } int main(int argc, char *argv[]) { initLogEnv(); - int32_t numOfThreads = 3; + int32_t numOfThreads = 10; TThread *thread = taosMemoryCalloc(1, sizeof(TThread) * numOfThreads); for (int i = 0; i < numOfThreads; i++) {