opt http module
This commit is contained in:
parent
8831f386e6
commit
06c06d290f
|
@ -428,6 +428,7 @@ void transDestoryExHandle(void* handle);
|
||||||
int32_t transGetRefMgt();
|
int32_t transGetRefMgt();
|
||||||
int32_t transGetInstMgt();
|
int32_t transGetInstMgt();
|
||||||
|
|
||||||
|
void transHttpEnvDestroy();
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -39,73 +39,11 @@ typedef struct SHttpMsg {
|
||||||
char* cont;
|
char* cont;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
EHttpCompFlag flag;
|
EHttpCompFlag flag;
|
||||||
|
int8_t quit;
|
||||||
|
SHttpModule* http;
|
||||||
|
|
||||||
} SHttpMsg;
|
} SHttpMsg;
|
||||||
|
|
||||||
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
|
|
||||||
static SHttpModule* http = NULL;
|
|
||||||
|
|
||||||
static void httpHandleReq(SHttpMsg* msg);
|
|
||||||
static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen,
|
|
||||||
EHttpCompFlag flag);
|
|
||||||
|
|
||||||
static void* httpThread(void* arg) {
|
|
||||||
SHttpModule* http = (SHttpModule*)arg;
|
|
||||||
setThreadName("http-cli-send-thread");
|
|
||||||
uv_run(http->loop, UV_RUN_DEFAULT);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void httpDestroyMsg(SHttpMsg* msg) {
|
|
||||||
if (msg == NULL) return;
|
|
||||||
|
|
||||||
taosMemoryFree(msg->server);
|
|
||||||
taosMemoryFree(msg->cont);
|
|
||||||
taosMemoryFree(msg);
|
|
||||||
}
|
|
||||||
static void httpAsyncCb(uv_async_t* handle) {
|
|
||||||
SAsyncItem* item = handle->data;
|
|
||||||
SHttpModule* http = item->pThrd;
|
|
||||||
|
|
||||||
SHttpMsg* msg = NULL;
|
|
||||||
|
|
||||||
queue wq;
|
|
||||||
taosThreadMutexLock(&item->mtx);
|
|
||||||
QUEUE_MOVE(&item->qmsg, &wq);
|
|
||||||
taosThreadMutexUnlock(&item->mtx);
|
|
||||||
|
|
||||||
int count = 0;
|
|
||||||
while (!QUEUE_IS_EMPTY(&wq)) {
|
|
||||||
queue* h = QUEUE_HEAD(&wq);
|
|
||||||
QUEUE_REMOVE(h);
|
|
||||||
msg = QUEUE_DATA(h, SHttpMsg, q);
|
|
||||||
httpHandleReq(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void transHttpEnvInit() {
|
|
||||||
http = taosMemoryMalloc(sizeof(SHttpModule));
|
|
||||||
|
|
||||||
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
|
||||||
uv_loop_init(http->loop);
|
|
||||||
|
|
||||||
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
|
|
||||||
|
|
||||||
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
|
|
||||||
if (err != 0) {
|
|
||||||
taosMemoryFree(http->loop);
|
|
||||||
taosMemoryFree(http);
|
|
||||||
http = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
static void transHttpEnvDestroy() {
|
|
||||||
if (http == NULL) return;
|
|
||||||
|
|
||||||
transAsyncPoolDestroy(http->asyncPool);
|
|
||||||
taosMemoryFree(http->loop);
|
|
||||||
taosMemoryFree(http);
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct SHttpClient {
|
typedef struct SHttpClient {
|
||||||
uv_connect_t conn;
|
uv_connect_t conn;
|
||||||
uv_tcp_t tcp;
|
uv_tcp_t tcp;
|
||||||
|
@ -117,6 +55,17 @@ typedef struct SHttpClient {
|
||||||
struct sockaddr_in dest;
|
struct sockaddr_in dest;
|
||||||
} SHttpClient;
|
} SHttpClient;
|
||||||
|
|
||||||
|
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
|
||||||
|
static SHttpModule* thttp = NULL;
|
||||||
|
static void transHttpEnvInit();
|
||||||
|
|
||||||
|
static void httpHandleReq(SHttpMsg* msg);
|
||||||
|
static void httpHandleQuit(SHttpMsg* msg);
|
||||||
|
static int32_t httpSendQuit();
|
||||||
|
|
||||||
|
static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen,
|
||||||
|
EHttpCompFlag flag);
|
||||||
|
|
||||||
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
|
static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen,
|
||||||
EHttpCompFlag flag) {
|
EHttpCompFlag flag) {
|
||||||
if (flag == HTTP_FLAT) {
|
if (flag == HTTP_FLAT) {
|
||||||
|
@ -135,6 +84,7 @@ static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pH
|
||||||
"Content-Length: %d\n\n",
|
"Content-Length: %d\n\n",
|
||||||
server, contLen);
|
server, contLen);
|
||||||
} else {
|
} else {
|
||||||
|
terrno = TSDB_CODE_INVALID_CFG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -208,6 +158,57 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
||||||
|
uint32_t ip = taosGetIpv4FromFqdn(server);
|
||||||
|
if (ip == 0xffffffff) {
|
||||||
|
tError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
char buf[128] = {0};
|
||||||
|
tinet_ntoa(buf, ip);
|
||||||
|
uv_ip4_addr(buf, port, dest);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void* httpThread(void* arg) {
|
||||||
|
SHttpModule* http = (SHttpModule*)arg;
|
||||||
|
setThreadName("http-cli-send-thread");
|
||||||
|
uv_run(http->loop, UV_RUN_DEFAULT);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void httpDestroyMsg(SHttpMsg* msg) {
|
||||||
|
if (msg == NULL) return;
|
||||||
|
|
||||||
|
taosMemoryFree(msg->server);
|
||||||
|
taosMemoryFree(msg->cont);
|
||||||
|
taosMemoryFree(msg);
|
||||||
|
}
|
||||||
|
static void httpAsyncCb(uv_async_t* handle) {
|
||||||
|
SAsyncItem* item = handle->data;
|
||||||
|
SHttpModule* http = item->pThrd;
|
||||||
|
|
||||||
|
SHttpMsg *msg = NULL, *quitMsg = NULL;
|
||||||
|
|
||||||
|
queue wq;
|
||||||
|
taosThreadMutexLock(&item->mtx);
|
||||||
|
QUEUE_MOVE(&item->qmsg, &wq);
|
||||||
|
taosThreadMutexUnlock(&item->mtx);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||||
|
queue* h = QUEUE_HEAD(&wq);
|
||||||
|
QUEUE_REMOVE(h);
|
||||||
|
msg = QUEUE_DATA(h, SHttpMsg, q);
|
||||||
|
if (msg->quit) {
|
||||||
|
quitMsg = msg;
|
||||||
|
} else {
|
||||||
|
httpHandleReq(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (quitMsg) httpHandleQuit(quitMsg);
|
||||||
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
|
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
|
||||||
taosMemoryFree(cli->wbuf[0].base);
|
taosMemoryFree(cli->wbuf[0].base);
|
||||||
taosMemoryFree(cli->wbuf[1].base);
|
taosMemoryFree(cli->wbuf[1].base);
|
||||||
|
@ -216,88 +217,82 @@ static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
|
||||||
taosMemoryFree(cli->addr);
|
taosMemoryFree(cli->addr);
|
||||||
taosMemoryFree(cli);
|
taosMemoryFree(cli);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
|
static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) {
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
buf->base = cli->rbuf;
|
buf->base = cli->rbuf;
|
||||||
buf->len = HTTP_RECV_BUF_SIZE;
|
buf->len = HTTP_RECV_BUF_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
SHttpClient* cli = handle->data;
|
SHttpClient* cli = handle->data;
|
||||||
if (nread < 0) {
|
if (nread < 0) {
|
||||||
uError("http-report recv error:%s", uv_err_name(nread));
|
tError("http-report recv error:%s", uv_err_name(nread));
|
||||||
} else {
|
} else {
|
||||||
uTrace("http-report succ to recv %d bytes", (int32_t)nread);
|
tTrace("http-report succ to recv %d bytes", (int32_t)nread);
|
||||||
}
|
}
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
} else {
|
|
||||||
destroyHttpClient(cli);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void clientSentCb(uv_write_t* req, int32_t status) {
|
static void clientSentCb(uv_write_t* req, int32_t status) {
|
||||||
SHttpClient* cli = req->data;
|
SHttpClient* cli = req->data;
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
uError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
} else {
|
|
||||||
destroyHttpClient(cli);
|
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
uTrace("http-report succ to send data");
|
tTrace("http-report succ to send data");
|
||||||
}
|
}
|
||||||
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
|
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
uError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
} else {
|
|
||||||
destroyHttpClient(cli);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void clientConnCb(uv_connect_t* req, int32_t status) {
|
static void clientConnCb(uv_connect_t* req, int32_t status) {
|
||||||
SHttpClient* cli = req->data;
|
SHttpClient* cli = req->data;
|
||||||
if (status != 0) {
|
if (status != 0) {
|
||||||
uError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
} else {
|
|
||||||
destroyHttpClient(cli);
|
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
|
||||||
if (0 != status) {
|
if (0 != status) {
|
||||||
uError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
|
||||||
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
|
||||||
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
|
||||||
} else {
|
|
||||||
destroyHttpClient(cli);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, struct sockaddr_in* dest) {
|
int32_t httpSendQuit() {
|
||||||
uint32_t ip = taosGetIpv4FromFqdn(server);
|
SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
|
||||||
if (ip == 0xffffffff) {
|
msg->quit = 1;
|
||||||
uError("http-report failed to get http server:%s since %s", server, errno == 0 ? "invalid http server" : terrstr());
|
|
||||||
|
SHttpModule* load = atomic_load_ptr(&thttp);
|
||||||
|
if (load == NULL) {
|
||||||
|
httpDestroyMsg(msg);
|
||||||
|
tError("http-report already released");
|
||||||
return -1;
|
return -1;
|
||||||
|
} else {
|
||||||
|
msg->http = load;
|
||||||
}
|
}
|
||||||
char buf[128] = {0};
|
transAsyncSend(load->asyncPool, &(msg->q));
|
||||||
tinet_ntoa(buf, ip);
|
|
||||||
uv_ip4_addr(buf, port, dest);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
|
|
||||||
taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
|
||||||
return taosSendHttpReportImpl(server, port, pCont, contLen, flag);
|
|
||||||
}
|
|
||||||
static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen,
|
static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen,
|
||||||
EHttpCompFlag flag) {
|
EHttpCompFlag flag) {
|
||||||
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
|
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
|
||||||
|
@ -307,15 +302,49 @@ static int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* p
|
||||||
memcpy(msg->cont, pCont, contLen);
|
memcpy(msg->cont, pCont, contLen);
|
||||||
msg->len = contLen;
|
msg->len = contLen;
|
||||||
msg->flag = flag;
|
msg->flag = flag;
|
||||||
|
msg->quit = 0;
|
||||||
|
|
||||||
|
SHttpModule* load = atomic_load_ptr(&thttp);
|
||||||
|
if (load == NULL) {
|
||||||
|
httpDestroyMsg(msg);
|
||||||
|
tError("http-report already released");
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
msg->http = load;
|
||||||
|
transAsyncSend(load->asyncPool, &(msg->q));
|
||||||
|
}
|
||||||
|
|
||||||
transAsyncSend(http->asyncPool, &(msg->q));
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void httpDestroyClientCb(uv_handle_t* handle) {
|
||||||
|
SHttpClient* http = handle->data;
|
||||||
|
destroyHttpClient(http);
|
||||||
|
}
|
||||||
|
static void httpWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
|
// impl later
|
||||||
|
if (!uv_is_closing(handle)) {
|
||||||
|
uv_handle_type type = uv_handle_get_type(handle);
|
||||||
|
if (uv_handle_get_type(handle) == UV_TCP) {
|
||||||
|
uv_close(handle, httpDestroyClientCb);
|
||||||
|
} else {
|
||||||
|
uv_close(handle, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
static void httpHandleQuit(SHttpMsg* msg) {
|
||||||
|
SHttpModule* http = msg->http;
|
||||||
|
taosMemoryFree(msg);
|
||||||
|
|
||||||
|
uv_walk(http->loop, httpWalkCb, NULL);
|
||||||
|
}
|
||||||
static void httpHandleReq(SHttpMsg* msg) {
|
static void httpHandleReq(SHttpMsg* msg) {
|
||||||
|
SHttpModule* http = msg->http;
|
||||||
|
|
||||||
struct sockaddr_in dest = {0};
|
struct sockaddr_in dest = {0};
|
||||||
if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) {
|
if (taosBuildDstAddr(msg->server, msg->port + 1, &dest) < 0) {
|
||||||
httpDestroyMsg(msg);
|
goto END;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
if (msg->flag == HTTP_GZIP) {
|
if (msg->flag == HTTP_GZIP) {
|
||||||
int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len);
|
int32_t dstLen = taosCompressHttpRport(msg->cont, msg->len);
|
||||||
|
@ -324,11 +353,18 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
} else {
|
} else {
|
||||||
msg->flag = HTTP_FLAT;
|
msg->flag = HTTP_FLAT;
|
||||||
}
|
}
|
||||||
|
if (dstLen < 0) {
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = 2048;
|
int32_t len = 2048;
|
||||||
char* header = taosMemoryCalloc(1, len);
|
char* header = taosMemoryCalloc(1, len);
|
||||||
int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag);
|
int32_t headLen = taosBuildHttpHeader(msg->server, msg->len, header, len, msg->flag);
|
||||||
|
if (headLen < 0) {
|
||||||
|
taosMemoryFree(header);
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
|
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
|
||||||
wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var
|
wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var
|
||||||
|
@ -347,20 +383,64 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
taosMemoryFree(msg);
|
taosMemoryFree(msg);
|
||||||
|
|
||||||
uv_tcp_init(http->loop, &cli->tcp);
|
uv_tcp_init(http->loop, &cli->tcp);
|
||||||
|
|
||||||
// set up timeout to avoid stuck;
|
// set up timeout to avoid stuck;
|
||||||
int32_t fd = taosCreateSocketWithTimeout(5);
|
int32_t fd = taosCreateSocketWithTimeout(5);
|
||||||
|
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
||||||
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
uError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
|
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
|
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
uError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
|
tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
|
||||||
cli->port);
|
cli->port);
|
||||||
destroyHttpClient(cli);
|
destroyHttpClient(cli);
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
|
|
||||||
|
END:
|
||||||
|
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
|
||||||
|
httpDestroyMsg(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
|
||||||
|
taosThreadOnce(&transHttpInit, transHttpEnvInit);
|
||||||
|
return taosSendHttpReportImpl(server, port, pCont, contLen, flag);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void transHttpEnvInit() {
|
||||||
|
SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule));
|
||||||
|
|
||||||
|
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
||||||
|
uv_loop_init(http->loop);
|
||||||
|
|
||||||
|
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
|
||||||
|
|
||||||
|
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
|
||||||
|
if (err != 0) {
|
||||||
|
taosMemoryFree(http->loop);
|
||||||
|
taosMemoryFree(http);
|
||||||
|
http = NULL;
|
||||||
|
}
|
||||||
|
atomic_store_ptr(&thttp, http);
|
||||||
|
}
|
||||||
|
|
||||||
|
void transHttpEnvDestroy() {
|
||||||
|
SHttpModule* load = atomic_load_ptr(&thttp);
|
||||||
|
if (load == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
httpSendQuit();
|
||||||
|
taosThreadJoin(load->thread, NULL);
|
||||||
|
|
||||||
|
TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsg);
|
||||||
|
transAsyncPoolDestroy(load->asyncPool);
|
||||||
|
uv_loop_close(load->loop);
|
||||||
|
taosMemoryFree(load->loop);
|
||||||
|
taosMemoryFree(load);
|
||||||
|
|
||||||
|
atomic_store_ptr(&thttp, NULL);
|
||||||
}
|
}
|
||||||
|
|
|
@ -172,6 +172,8 @@ int32_t rpcInit() {
|
||||||
}
|
}
|
||||||
void rpcCleanup(void) {
|
void rpcCleanup(void) {
|
||||||
transCleanup();
|
transCleanup();
|
||||||
|
transHttpEnvDestroy();
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue