opt http module
This commit is contained in:
parent
d058475fac
commit
4fdc98d3fb
|
@ -20,11 +20,80 @@
|
||||||
#include "thttp.h"
|
#include "thttp.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "transComm.h"
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
#define HTTP_RECV_BUF_SIZE 1024
|
#define HTTP_RECV_BUF_SIZE 1024
|
||||||
|
|
||||||
|
typedef struct SHttpModule {
|
||||||
|
uv_loop_t* loop;
|
||||||
|
SAsyncPool* asyncPool;
|
||||||
|
TdThread thread;
|
||||||
|
} SHttpModule;
|
||||||
|
|
||||||
|
typedef struct SHttpMsg {
|
||||||
|
queue q;
|
||||||
|
char* server;
|
||||||
|
int32_t port;
|
||||||
|
char* cont;
|
||||||
|
int32_t len;
|
||||||
|
EHttpCompFlag flag;
|
||||||
|
|
||||||
|
} SHttpMsg;
|
||||||
|
|
||||||
|
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
|
||||||
|
static SHttpModule* http = NULL;
|
||||||
|
|
||||||
|
static void* httpThread(void* arg) {
|
||||||
|
SHttpModule* http = (SHttpModule*)arg;
|
||||||
|
setThreadName("http-cli-send-thread");
|
||||||
|
uv_run(http->loop, UV_RUN_DEFAULT);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void httpAsyncCb(uv_async_t* handle) {
|
||||||
|
SAsyncItem* item = handle->data;
|
||||||
|
SHttpModule* http = item->pThrd;
|
||||||
|
|
||||||
|
SHttpMsg* msg = NULL;
|
||||||
|
|
||||||
|
queue wq;
|
||||||
|
taosThreadMutexLock(&item->mtx);
|
||||||
|
QUEUE_MOVE(&item->qmsg, &wq);
|
||||||
|
taosThreadMutexUnlock(&item->mtx);
|
||||||
|
|
||||||
|
int count = 0;
|
||||||
|
while (!QUEUE_IS_EMPTY(&wq)) {
|
||||||
|
queue* h = QUEUE_HEAD(&wq);
|
||||||
|
QUEUE_REMOVE(h);
|
||||||
|
msg = QUEUE_DATA(h, SHttpMsg, q);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void transHttpEnvInit() {
|
||||||
|
http = taosMemoryMalloc(sizeof(SHttpModule));
|
||||||
|
|
||||||
|
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
|
||||||
|
uv_loop_init(http->loop);
|
||||||
|
|
||||||
|
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
|
||||||
|
|
||||||
|
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
|
||||||
|
if (err != 0) {
|
||||||
|
taosMemoryFree(http->loop);
|
||||||
|
taosMemoryFree(http);
|
||||||
|
http = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
static void transHttpEnvDestroy() {
|
||||||
|
if (http == NULL) return;
|
||||||
|
|
||||||
|
transAsyncPoolDestroy(http->asyncPool);
|
||||||
|
taosMemoryFree(http->loop);
|
||||||
|
taosMemoryFree(http);
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct SHttpClient {
|
typedef struct SHttpClient {
|
||||||
uv_connect_t conn;
|
uv_connect_t conn;
|
||||||
uv_tcp_t tcp;
|
uv_tcp_t tcp;
|
||||||
|
@ -127,6 +196,8 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
|
static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) {
|
||||||
|
taosMemoryFree(cli->wbuf[0].base);
|
||||||
|
taosMemoryFree(cli->wbuf[1].base);
|
||||||
taosMemoryFree(cli->wbuf);
|
taosMemoryFree(cli->wbuf);
|
||||||
taosMemoryFree(cli->rbuf);
|
taosMemoryFree(cli->rbuf);
|
||||||
taosMemoryFree(cli->addr);
|
taosMemoryFree(cli->addr);
|
||||||
|
@ -230,11 +301,12 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
|
||||||
}
|
}
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
char header[2048] = {0};
|
int32_t len = 2048;
|
||||||
int32_t headLen = taosBuildHttpHeader(server, contLen, header, sizeof(header), flag);
|
char* header = taosMemoryCalloc(1, len);
|
||||||
|
int32_t headLen = taosBuildHttpHeader(server, contLen, header, len, flag);
|
||||||
|
|
||||||
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
|
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
|
||||||
wb[0] = uv_buf_init((char*)header, headLen); // stack var
|
wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var
|
||||||
wb[1] = uv_buf_init((char*)pCont, contLen); // heap var
|
wb[1] = uv_buf_init((char*)pCont, contLen); // heap var
|
||||||
|
|
||||||
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
|
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
|
||||||
|
@ -281,3 +353,15 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
|
||||||
taosMemoryFree(loop);
|
taosMemoryFree(loop);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
int32_t taosSendHttpReportImpl(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) {
|
||||||
|
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
|
||||||
|
|
||||||
|
msg->server = strdup(server);
|
||||||
|
msg->port = port;
|
||||||
|
msg->cont = taosMemoryMalloc(contLen);
|
||||||
|
memcpy(msg->cont, pCont, contLen);
|
||||||
|
msg->flag = flag;
|
||||||
|
|
||||||
|
transAsyncSend(http->asyncPool, &(msg->q));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -187,17 +187,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
||||||
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
|
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CONN_HOST_THREAD_IDX1(idx, exh, refId, pThrd) \
|
|
||||||
do { \
|
|
||||||
if (exh == NULL) { \
|
|
||||||
idx = -1; \
|
|
||||||
} else { \
|
|
||||||
ASYNC_CHECK_HANDLE((exh), refId); \
|
|
||||||
pThrd = (SCliThrd*)(exh)->pThrd; \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
|
||||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
|
||||||
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
|
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
|
||||||
|
|
||||||
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
|
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
|
||||||
|
@ -217,6 +207,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
||||||
tDebug("msg found, %" PRIu64 "", ahandle); \
|
tDebug("msg found, %" PRIu64 "", ahandle); \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CONN_GET_NEXT_SENDMSG(conn) \
|
#define CONN_GET_NEXT_SENDMSG(conn) \
|
||||||
do { \
|
do { \
|
||||||
int i = 0; \
|
int i = 0; \
|
||||||
|
@ -231,21 +222,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CONN_HANDLE_THREAD_QUIT(thrd) \
|
|
||||||
do { \
|
|
||||||
if (thrd->quit) { \
|
|
||||||
return; \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define CONN_HANDLE_BROKEN(conn) \
|
|
||||||
do { \
|
|
||||||
if (conn->broken) { \
|
|
||||||
cliHandleExcept(conn); \
|
|
||||||
return; \
|
|
||||||
} \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#define CONN_SET_PERSIST_BY_APP(conn) \
|
#define CONN_SET_PERSIST_BY_APP(conn) \
|
||||||
do { \
|
do { \
|
||||||
if (conn->status == ConnNormal) { \
|
if (conn->status == ConnNormal) { \
|
||||||
|
|
Loading…
Reference in New Issue