diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index afb982a50a..96537a950e 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -34,6 +34,7 @@ typedef struct SHttpModule { SAsyncPool* asyncPool; TdThread thread; SHashObj* connStatusTable; + int8_t quit; } SHttpModule; typedef struct SHttpMsg { @@ -166,7 +167,7 @@ _OVER: static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { - tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr()); + tError("http-report failed to resolving domain names: %s", server); return -1; } char buf[128] = {0}; @@ -190,19 +191,40 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } + +static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { + SHttpMsg *msg = NULL, *quitMsg = NULL; + if (atomic_load_8(&http->quit) == 0) { + return; + } + + while (!QUEUE_IS_EMPTY(&item->qmsg)) { + queue* h = QUEUE_HEAD(&item->qmsg); + QUEUE_REMOVE(h); + msg = QUEUE_DATA(h, SHttpMsg, q); + if (!msg->quit) { + httpDestroyMsg(msg); + } else { + quitMsg = msg; + } + } + if (quitMsg != NULL) { + QUEUE_PUSH(&item->qmsg, &quitMsg->q); + } +} static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SHttpModule* http = item->pThrd; SHttpMsg *msg = NULL, *quitMsg = NULL; - - queue wq; + queue wq; QUEUE_INIT(&wq); static int32_t BATCH_SIZE = 5; int32_t count = 0; taosThreadMutexLock(&item->mtx); + httpMayDiscardMsg(http, item); while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) { queue* h = QUEUE_HEAD(&item->qmsg); @@ -497,9 +519,10 @@ static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } static void transHttpEnvInit() { httpRefMgt = taosOpenRef(1, transHttpDestroyHandle); - SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); + SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + http->quit = 0; uv_loop_init(http->loop); @@ -526,6 +549,8 @@ void transHttpEnvDestroy() { return; } SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); + + atomic_store_8(&load->quit, 1); httpSendQuit(); taosThreadJoin(load->thread, NULL);