diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 2723a11709..52c763372f 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -32,6 +32,7 @@ typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; TdThread thread; + int8_t quit; } SHttpModule; typedef struct SHttpMsg { @@ -186,6 +187,27 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } + +static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { + SHttpMsg *msg = NULL, *quitMsg = NULL; + if (atomic_load_8(&http->quit) == 0) { + return; + } + + while (!QUEUE_IS_EMPTY(&item->qmsg)) { + queue* h = QUEUE_HEAD(&item->qmsg); + QUEUE_REMOVE(h); + msg = QUEUE_DATA(h, SHttpMsg, q); + if (!msg->quit) { + httpDestroyMsg(msg); + } else { + quitMsg = msg; + } + } + if (quitMsg != NULL) { + QUEUE_PUSH(&item->qmsg, &quitMsg->q); + } +} static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SHttpModule* http = item->pThrd; @@ -194,6 +216,8 @@ static void httpAsyncCb(uv_async_t* handle) { queue wq; taosThreadMutexLock(&item->mtx); + httpMayDiscardMsg(http, item); + QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); @@ -466,6 +490,10 @@ void transHttpEnvDestroy() { return; } SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); + if (load == NULL) return; + + atomic_store_8(&load->quit, 1); + httpSendQuit(); taosThreadJoin(load->thread, NULL);