From 15f690dd6de0a6671e0a2b6eaa360b7e81c69b7b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 6 Dec 2023 11:44:41 +0800 Subject: [PATCH 1/4] add http fail fast --- source/libs/transport/src/thttp.c | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index afb982a50a..bbfccf9f70 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -34,6 +34,7 @@ typedef struct SHttpModule { SAsyncPool* asyncPool; TdThread thread; SHashObj* connStatusTable; + int8_t quit; } SHttpModule; typedef struct SHttpMsg { @@ -190,19 +191,37 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } + +static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { + SHttpMsg *msg = NULL, *quitMsg = NULL; + int8_t quit = atomic_load_8(&http->quit); + if (quit == 1) { + while (!QUEUE_IS_EMPTY(&item->qmsg)) { + queue* h = QUEUE_HEAD(&item->qmsg); + QUEUE_REMOVE(h); + msg = QUEUE_DATA(h, SHttpMsg, q); + if (!msg->quit) { + httpDestroyMsg(msg); + } else { + quitMsg = msg; + } + } + QUEUE_PUSH(&item->qmsg, &quitMsg->q); + } +} static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SHttpModule* http = item->pThrd; SHttpMsg *msg = NULL, *quitMsg = NULL; - - queue wq; + queue wq; QUEUE_INIT(&wq); static int32_t BATCH_SIZE = 5; int32_t count = 0; taosThreadMutexLock(&item->mtx); + httpMayDiscardMsg(http, item); while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) { queue* h = QUEUE_HEAD(&item->qmsg); @@ -526,6 +545,8 @@ void transHttpEnvDestroy() { return; } SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); + + atomic_store_8(&load->quit, 1); httpSendQuit(); taosThreadJoin(load->thread, NULL); From 59c59362f3819c6cb1b666a37ee9c8c34a755e94 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 6 Dec 2023 13:52:45 +0800 Subject: [PATCH 2/4] add http fast quit --- source/libs/transport/src/thttp.c | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index bbfccf9f70..b36731ab0b 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -194,18 +194,21 @@ static void httpDestroyMsg(SHttpMsg* msg) { static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { SHttpMsg *msg = NULL, *quitMsg = NULL; - int8_t quit = atomic_load_8(&http->quit); - if (quit == 1) { - while (!QUEUE_IS_EMPTY(&item->qmsg)) { - queue* h = QUEUE_HEAD(&item->qmsg); - QUEUE_REMOVE(h); - msg = QUEUE_DATA(h, SHttpMsg, q); - if (!msg->quit) { - httpDestroyMsg(msg); - } else { - quitMsg = msg; - } + if (atomic_load_8(&http->quit) == 0) { + return; + } + + while (!QUEUE_IS_EMPTY(&item->qmsg)) { + queue* h = QUEUE_HEAD(&item->qmsg); + QUEUE_REMOVE(h); + msg = QUEUE_DATA(h, SHttpMsg, q); + if (!msg->quit) { + httpDestroyMsg(msg); + } else { + quitMsg = msg; } + } + if (quitMsg != NULL) { QUEUE_PUSH(&item->qmsg, &quitMsg->q); } } From 9c909fd7a2960644c9a8a707f06affb13724b56a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 6 Dec 2023 15:49:21 +0800 Subject: [PATCH 3/4] refactor code --- source/libs/transport/src/thttp.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index b36731ab0b..33d1a2565a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -167,7 +167,8 @@ _OVER: static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { - tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr()); + tError("http-report failed to get http server:%s since %s", server, + (terrno == 0 || errno == 0) ? "invalid http server" : terrstr()); return -1; } char buf[128] = {0}; @@ -519,9 +520,10 @@ static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } static void transHttpEnvInit() { httpRefMgt = taosOpenRef(1, transHttpDestroyHandle); - SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); + SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + http->quit = 0; uv_loop_init(http->loop); From 90588fac6988d672d853a44842015a8a915a2b08 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 6 Dec 2023 16:06:41 +0800 Subject: [PATCH 4/4] refactor code --- source/libs/transport/src/thttp.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 33d1a2565a..96537a950e 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -167,8 +167,7 @@ _OVER: static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { - tError("http-report failed to get http server:%s since %s", server, - (terrno == 0 || errno == 0) ? "invalid http server" : terrstr()); + tError("http-report failed to resolving domain names: %s", server); return -1; } char buf[128] = {0};