Merge pull request #23956 from taosdata/fix/fixHttpFastQuit
Fix/fix http fast quit
This commit is contained in:
commit
0bfbc359ed
|
@ -34,6 +34,7 @@ typedef struct SHttpModule {
|
||||||
SAsyncPool* asyncPool;
|
SAsyncPool* asyncPool;
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
SHashObj* connStatusTable;
|
SHashObj* connStatusTable;
|
||||||
|
int8_t quit;
|
||||||
} SHttpModule;
|
} SHttpModule;
|
||||||
|
|
||||||
typedef struct SHttpMsg {
|
typedef struct SHttpMsg {
|
||||||
|
@ -166,7 +167,7 @@ _OVER:
|
||||||
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
||||||
uint32_t ip = taosGetIpv4FromFqdn(server);
|
uint32_t ip = taosGetIpv4FromFqdn(server);
|
||||||
if (ip == 0xffffffff) {
|
if (ip == 0xffffffff) {
|
||||||
tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr());
|
tError("http-report failed to resolving domain names: %s", server);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
|
@ -190,19 +191,40 @@ static void httpDestroyMsg(SHttpMsg* msg) {
|
||||||
taosMemoryFree(msg->cont);
|
taosMemoryFree(msg->cont);
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
|
||||||
|
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
||||||
|
if (atomic_load_8(&http->quit) == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!QUEUE_IS_EMPTY(&item->qmsg)) {
|
||||||
|
queue* h = QUEUE_HEAD(&item->qmsg);
|
||||||
|
QUEUE_REMOVE(h);
|
||||||
|
msg = QUEUE_DATA(h, SHttpMsg, q);
|
||||||
|
if (!msg->quit) {
|
||||||
|
httpDestroyMsg(msg);
|
||||||
|
} else {
|
||||||
|
quitMsg = msg;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (quitMsg != NULL) {
|
||||||
|
QUEUE_PUSH(&item->qmsg, &quitMsg->q);
|
||||||
|
}
|
||||||
|
}
|
||||||
static void httpAsyncCb(uv_async_t* handle) {
|
static void httpAsyncCb(uv_async_t* handle) {
|
||||||
SAsyncItem* item = handle->data;
|
SAsyncItem* item = handle->data;
|
||||||
SHttpModule* http = item->pThrd;
|
SHttpModule* http = item->pThrd;
|
||||||
|
|
||||||
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
||||||
|
queue wq;
|
||||||
queue wq;
|
|
||||||
QUEUE_INIT(&wq);
|
QUEUE_INIT(&wq);
|
||||||
|
|
||||||
static int32_t BATCH_SIZE = 5;
|
static int32_t BATCH_SIZE = 5;
|
||||||
int32_t count = 0;
|
int32_t count = 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&item->mtx);
|
taosThreadMutexLock(&item->mtx);
|
||||||
|
httpMayDiscardMsg(http, item);
|
||||||
|
|
||||||
while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) {
|
while (!QUEUE_IS_EMPTY(&item->qmsg) && count++ < BATCH_SIZE) {
|
||||||
queue* h = QUEUE_HEAD(&item->qmsg);
|
queue* h = QUEUE_HEAD(&item->qmsg);
|
||||||
|
@ -497,9 +519,10 @@ static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
|
||||||
static void transHttpEnvInit() {
|
static void transHttpEnvInit() {
|
||||||
httpRefMgt = taosOpenRef(1, transHttpDestroyHandle);
|
httpRefMgt = taosOpenRef(1, transHttpDestroyHandle);
|
||||||
|
|
||||||
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
|
SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule));
|
||||||
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
||||||
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
|
http->quit = 0;
|
||||||
|
|
||||||
uv_loop_init(http->loop);
|
uv_loop_init(http->loop);
|
||||||
|
|
||||||
|
@ -526,6 +549,8 @@ void transHttpEnvDestroy() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
|
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
|
||||||
|
|
||||||
|
atomic_store_8(&load->quit, 1);
|
||||||
httpSendQuit();
|
httpSendQuit();
|
||||||
taosThreadJoin(load->thread, NULL);
|
taosThreadJoin(load->thread, NULL);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue