diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 2723a11709..ab4c8facfb 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -32,6 +32,7 @@ typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; TdThread thread; + int8_t quit; } SHttpModule; typedef struct SHttpMsg { @@ -162,7 +163,7 @@ _OVER: static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) { uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { - tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr()); + tError("http-report failed to resolving domain names: %s", server); return -1; } char buf[128] = {0}; @@ -186,6 +187,27 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } + +static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { + SHttpMsg *msg = NULL, *quitMsg = NULL; + if (atomic_load_8(&http->quit) == 0) { + return; + } + + while (!QUEUE_IS_EMPTY(&item->qmsg)) { + queue* h = QUEUE_HEAD(&item->qmsg); + QUEUE_REMOVE(h); + msg = QUEUE_DATA(h, SHttpMsg, q); + if (!msg->quit) { + httpDestroyMsg(msg); + } else { + quitMsg = msg; + } + } + if (quitMsg != NULL) { + QUEUE_PUSH(&item->qmsg, &quitMsg->q); + } +} static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SHttpModule* http = item->pThrd; @@ -194,6 +216,8 @@ static void httpAsyncCb(uv_async_t* handle) { queue wq; taosThreadMutexLock(&item->mtx); + httpMayDiscardMsg(http, item); + QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); @@ -440,6 +464,8 @@ static void transHttpEnvInit() { httpRefMgt = taosOpenRef(1, transHttpDestroyHandle); SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); + + http->quit = 0; http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_init(http->loop); @@ -466,6 +492,10 @@ void transHttpEnvDestroy() { return; } SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); + if (load == NULL) return; + + atomic_store_8(&load->quit, 1); + httpSendQuit(); taosThreadJoin(load->thread, NULL);