add http interface

This commit is contained in:
Yihao Deng 2024-07-08 02:32:55 +00:00
parent 92f2d0f94f
commit 1c4f354adf
4 changed files with 305 additions and 123 deletions

View File

@ -25,8 +25,13 @@ extern "C" {
typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag;
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId);
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag);
int64_t taosInitHttpChan();
void taosDestroyHttpChan(int64_t chanId);
#ifdef __cplusplus
}

View File

@ -87,6 +87,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) //
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)
#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024)
#define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025)
@ -136,6 +137,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
#define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D)
#define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E)
#define TSDB_CODE_THIRDPARTY_ERROR TAOS_DEF_ERROR_CODE(0, 0x012F)
#define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130)
#define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131)

View File

@ -19,7 +19,6 @@
#include "zlib.h"
#include "thttp.h"
#include "taoserror.h"
#include "tlog.h"
#include "transComm.h"
// clang-format on
@ -27,13 +26,16 @@
#define HTTP_RECV_BUF_SIZE 1024
static int32_t httpRefMgt = 0;
static int64_t httpRef = -1;
static int32_t FAST_FAILURE_LIMIT = 1;
static int64_t httpDefaultChanId = -1;
typedef struct SHttpModule {
uv_loop_t* loop;
SAsyncPool* asyncPool;
TdThread thread;
SHashObj* connStatusTable;
SHashObj* connPool;
int8_t quit;
} SHttpModule;
@ -46,7 +48,7 @@ typedef struct SHttpMsg {
int32_t len;
EHttpCompFlag flag;
int8_t quit;
int64_t chanId;
} SHttpMsg;
typedef struct SHttpClient {
@ -58,19 +60,33 @@ typedef struct SHttpClient {
char* addr;
uint16_t port;
struct sockaddr_in dest;
int64_t chanId;
} SHttpClient;
typedef struct SHttpConnList {
queue q;
} SHttpConnList;
static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT;
static void transHttpEnvInit();
static void httpHandleReq(SHttpMsg* msg);
static void httpHandleQuit(SHttpMsg* msg);
static int32_t httpSendQuit();
static int32_t httpSendQuit(int64_t chanId);
static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId);
static void httpDestroyMsg(SHttpMsg* msg);
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port);
static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ);
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag);
static void httpModuleDestroy(SHttpModule* http);
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId);
static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen,
EHttpCompFlag flag) {
@ -91,7 +107,7 @@ static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t
uri, server, contLen);
} else {
terrno = TSDB_CODE_INVALID_CFG;
return -1;
return terrno;
}
}
@ -158,6 +174,8 @@ _OVER:
if (code == 0) {
memcpy(pSrc, pDest, gzipStream.total_out);
code = gzipStream.total_out;
} else {
code = terrno;
}
taosMemoryFree(pDest);
@ -168,11 +186,15 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port,
uint32_t ip = taosGetIpv4FromFqdn(server);
if (ip == 0xffffffff) {
tError("http-report failed to resolving domain names: %s", server);
return -1;
return TSDB_CODE_RPC_FQDN_ERROR;
}
char buf[128] = {0};
char buf[256] = {0};
tinet_ntoa(buf, ip);
uv_ip4_addr(buf, port, dest);
int ret = uv_ip4_addr(buf, port, dest);
if (ret != 0) {
tError("http-report failed to get addr %s", uv_err_name(ret));
return TSDB_CODE_THIRDPARTY_ERROR;
}
return 0;
}
@ -183,6 +205,43 @@ static void* httpThread(void* arg) {
return NULL;
}
static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId) {
if (server == NULL || uri == NULL) {
tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
if (pCont == NULL || contLen == 0) {
tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
if (msg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
msg->port = port;
msg->server = taosStrdup(server);
msg->uri = taosStrdup(uri);
msg->cont = taosMemoryMalloc(contLen);
if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) {
httpDestroyMsg(msg);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memcpy(msg->cont, pCont, contLen);
msg->len = contLen;
msg->flag = flag;
msg->quit = 0;
msg->chanId = chanId;
return msg;
}
static void httpDestroyMsg(SHttpMsg* msg) {
if (msg == NULL) return;
@ -191,15 +250,7 @@ static void httpDestroyMsg(SHttpMsg* msg) {
taosMemoryFree(msg->cont);
taosMemoryFree(msg);
}
static void httpDestroyMsgWrapper(void* cont, void* param) {
httpDestroyMsg((SHttpMsg*)cont);
// if (msg == NULL) return;
// taosMemoryFree(msg->server);
// taosMemoryFree(msg->uri);
// taosMemoryFree(msg->cont);
// taosMemoryFree(msg);
}
static void httpDestroyMsgWrapper(void* cont, void* param) { httpDestroyMsg((SHttpMsg*)cont); }
static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) {
SHttpMsg *msg = NULL, *quitMsg = NULL;
@ -278,7 +329,7 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested
static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
SHttpClient* cli = handle->data;
if (nread < 0) {
tError("http-report recv error:%s", uv_err_name(nread));
tError("http-report recv error:%s", uv_strerror(nread));
} else {
tTrace("http-report succ to recv %d bytes", (int32_t)nread);
}
@ -289,92 +340,77 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const
static void clientSentCb(uv_write_t* req, int32_t status) {
SHttpClient* cli = req->data;
if (status != 0) {
tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
cli->port, cli->chanId);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
return;
} else {
tTrace("http-report succ to send data");
tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId);
}
status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb);
if (status != 0) {
tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
cli->port, cli->chanId);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
}
}
static void clientConnCb(uv_connect_t* req, int32_t status) {
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
SHttpClient* cli = req->data;
int64_t chanId = cli->chanId;
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
if (status != 0) {
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status),
cli->addr, cli->port, chanId);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
taosReleaseRef(httpRefMgt, httpRef);
taosReleaseRef(httpRefMgt, chanId);
return;
}
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1);
status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb);
if (0 != status) {
tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port);
tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr,
cli->port, chanId);
if (!uv_is_closing((uv_handle_t*)&cli->tcp)) {
uv_close((uv_handle_t*)&cli->tcp, clientCloseCb);
}
}
taosReleaseRef(httpRefMgt, httpRef);
taosReleaseRef(httpRefMgt, chanId);
}
int32_t httpSendQuit() {
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
if (http == NULL) return 0;
int32_t httpSendQuit(int64_t chanId) {
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
if (http == NULL) return terrno;
SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg));
if (msg == NULL) {
taosReleaseRef(httpRefMgt, chanId);
return TSDB_CODE_OUT_OF_MEMORY;
}
msg->quit = 1;
msg->chanId = chanId;
transAsyncSend(http->asyncPool, &(msg->q));
taosReleaseRef(httpRefMgt, httpRef);
int ret = transAsyncSend(http->asyncPool, &(msg->q));
if (ret != 0) {
taosMemoryFree(msg);
taosReleaseRef(httpRefMgt, chanId);
return ret;
}
taosReleaseRef(httpRefMgt, chanId);
return 0;
}
static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) {
if (server == NULL || uri == NULL) {
tError("http-report failed to report to invalid addr");
return -1;
}
if (pCont == NULL || contLen == 0) {
tError("http-report failed to report empty packet");
return -1;
}
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
if (load == NULL) {
tError("http-report already released");
return -1;
}
SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg));
msg->server = taosStrdup(server);
msg->uri = taosStrdup(uri);
msg->port = port;
msg->cont = taosMemoryMalloc(contLen);
memcpy(msg->cont, pCont, contLen);
msg->len = contLen;
msg->flag = flag;
msg->quit = 0;
int ret = transAsyncSend(load->asyncPool, &(msg->q));
taosReleaseRef(httpRefMgt, httpRef);
return ret;
}
static void httpDestroyClientCb(uv_handle_t* handle) {
SHttpClient* http = handle->data;
destroyHttpClient(http);
@ -392,13 +428,14 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) {
return;
}
static void httpHandleQuit(SHttpMsg* msg) {
int64_t chanId = msg->chanId;
taosMemoryFree(msg);
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
tDebug("http-report receive quit, chanId:%" PRId64 "", chanId);
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
if (http == NULL) return;
uv_walk(http->loop, httpWalkCb, NULL);
taosReleaseRef(httpRefMgt, httpRef);
taosReleaseRef(httpRefMgt, chanId);
}
static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) {
@ -431,8 +468,12 @@ static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port,
return;
}
static void httpHandleReq(SHttpMsg* msg) {
int32_t ignore = false;
SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef);
int64_t chanId = msg->chanId;
int32_t ignore = false;
char* header = NULL;
terrno = 0;
SHttpModule* http = taosAcquireRef(httpRefMgt, chanId);
if (http == NULL) {
goto END;
}
@ -457,119 +498,250 @@ static void httpHandleReq(SHttpMsg* msg) {
}
}
int32_t len = 2048;
char* header = taosMemoryCalloc(1, len);
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, len, msg->flag);
if (headLen < 0) {
taosMemoryFree(header);
int32_t cap = 2048;
header = taosMemoryCalloc(1, cap);
if (header == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag);
if (headLen < 0 || headLen >= cap) {
goto END;
}
uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t));
if (wb == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var
wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var
SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient));
if (cli == NULL) {
taosMemoryFree(wb);
goto END;
}
cli->conn.data = cli;
cli->tcp.data = cli;
cli->req.data = cli;
cli->wbuf = wb;
cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE);
if (cli->rbuf == NULL) {
taosMemoryFree(msg->uri);
taosMemoryFree(msg);
destroyHttpClient(cli);
taosReleaseRef(httpRefMgt, chanId);
return;
}
cli->addr = msg->server;
cli->port = msg->port;
cli->dest = dest;
cli->chanId = chanId;
taosMemoryFree(msg->uri);
taosMemoryFree(msg);
uv_tcp_init(http->loop, &cli->tcp);
int err = uv_tcp_init(http->loop, &cli->tcp);
if (err != 0) {
tError("http-report failed to init socket handle, dst:%s:%d,reason:%s chanId:%" PRId64 "", cli->addr, cli->port,
uv_strerror(err), chanId);
destroyHttpClient(cli);
taosReleaseRef(httpRefMgt, chanId);
}
// set up timeout to avoid stuck;
int32_t fd = taosCreateSocketWithTimeout(5000);
if (fd < 0) {
tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port);
tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 "", cli->addr, cli->port, chanId);
destroyHttpClient(cli);
taosReleaseRef(httpRefMgt, httpRef);
taosReleaseRef(httpRefMgt, chanId);
return;
}
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
if (ret != 0) {
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
taosReleaseRef(httpRefMgt, httpRef);
tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(ret), cli->addr,
cli->port, chanId);
destroyHttpClient(cli);
taosReleaseRef(httpRefMgt, chanId);
return;
}
ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb);
if (ret != 0) {
tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr,
cli->port);
tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(ret),
cli->addr, cli->port, chanId);
httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0);
destroyHttpClient(cli);
}
taosReleaseRef(httpRefMgt, httpRef);
taosReleaseRef(httpRefMgt, chanId);
return;
END:
if (ignore == false) {
tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port);
tError("http-report failed to report, reason: %s, addr: %s:%d, chanId:%" PRId64 "", terrstr(), msg->server,
msg->port, chanId);
}
httpDestroyMsg(msg);
taosReleaseRef(httpRefMgt, httpRef);
taosMemoryFree(header);
taosReleaseRef(httpRefMgt, chanId);
}
static void httpModuleDestroy(SHttpModule* http) {
if (http->asyncPool != NULL) {
TRANS_DESTROY_ASYNC_POOL_MSG(http->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL);
transAsyncPoolDestroy(http->asyncPool);
}
if (http->loop) {
uv_loop_close(http->loop);
taosMemoryFree(http->loop);
}
taosHashCleanup(http->connStatusTable);
// not free http, http freeed by ref
}
static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont,
int32_t contLen, EHttpCompFlag flag, int64_t chanId) {
int32_t ret = 0;
terrno = 0;
SHttpMsg* msg = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId);
if (msg == NULL) {
return terrno;
}
SHttpModule* load = taosAcquireRef(httpRefMgt, chanId);
if (load == NULL) {
httpDestroyMsg(msg);
return terrno;
}
if (atomic_load_8(&load->quit)) {
httpDestroyMsg(msg);
taosReleaseRef(httpRefMgt, chanId);
terrno = TSDB_CODE_HTTP_MODULE_QUIT;
return terrno;
}
ret = transAsyncSend(load->asyncPool, &(msg->q));
if (ret < 0) {
httpDestroyMsg(msg);
taosReleaseRef(httpRefMgt, chanId);
terrno = TSDB_CODE_HTTP_MODULE_QUIT;
return terrno;
}
taosReleaseRef(httpRefMgt, chanId);
return ret;
}
int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag, int64_t chanId) {
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId);
}
int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen,
EHttpCompFlag flag) {
taosThreadOnce(&transHttpInit, transHttpEnvInit);
return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag);
return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId);
}
static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); }
int64_t transInitHttpChanImpl();
static void transHttpEnvInit() {
httpRefMgt = taosOpenRef(1, transHttpDestroyHandle);
SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule));
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
http->quit = 0;
uv_loop_init(http->loop);
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
if (NULL == http->asyncPool) {
taosMemoryFree(http->loop);
taosMemoryFree(http);
http = NULL;
return;
}
int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
if (err != 0) {
taosMemoryFree(http->loop);
taosMemoryFree(http);
http = NULL;
}
httpRef = taosAddRef(httpRefMgt, http);
httpRefMgt = taosOpenRef(64, transHttpDestroyHandle);
httpDefaultChanId = transInitHttpChanImpl();
}
void transHttpEnvDestroy() {
// remove http
if (httpRef == -1) {
// remove default chanId
taosDestroyHttpChan(httpDefaultChanId);
httpDefaultChanId = -1;
}
int64_t transInitHttpChanImpl() {
SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule));
if (http == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (http->connStatusTable == NULL) {
httpModuleDestroy(http);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
http->loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (http->loop == NULL) {
httpModuleDestroy(http);
taosMemoryFree(http);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
int err = uv_loop_init(http->loop);
if (err != 0) {
tError("http-report failed init uv, reason:%s", uv_strerror(err));
httpModuleDestroy(http);
taosMemoryFree(http);
return TSDB_CODE_THIRDPARTY_ERROR;
}
http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb);
if (NULL == http->asyncPool) {
httpModuleDestroy(http);
taosMemoryFree(http);
return terrno;
}
http->quit = 0;
err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http);
if (err != 0) {
httpModuleDestroy(http);
taosMemoryFree(http);
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
}
int64_t ref = taosAddRef(httpRefMgt, http);
if (ref < 0) {
return terrno;
}
return ref;
}
int64_t taosInitHttpChan() {
taosThreadOnce(&transHttpInit, transHttpEnvInit);
return transInitHttpChanImpl();
}
void taosDestroyHttpChan(int64_t chanId) {
tDebug("http-report send quit, chanId:%" PRId64 "", chanId);
int ret = 0;
SHttpModule* load = taosAcquireRef(httpRefMgt, chanId);
if (load == NULL) {
tError("http-report failed destroy chanId %" PRId64 "", chanId);
return;
}
SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef);
atomic_store_8(&load->quit, 1);
httpSendQuit();
ret = httpSendQuit(chanId);
if (ret != 0) {
tDebug("http-report already destroyed, chanId %" PRId64 "", chanId);
taosReleaseRef(httpRefMgt, chanId);
return;
}
taosThreadJoin(load->thread, NULL);
TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL);
transAsyncPoolDestroy(load->asyncPool);
uv_loop_close(load->loop);
taosMemoryFree(load->loop);
httpModuleDestroy(load);
taosHashCleanup(load->connStatusTable);
taosReleaseRef(httpRefMgt, httpRef);
taosRemoveRef(httpRefMgt, httpRef);
taosReleaseRef(httpRefMgt, chanId);
taosReleaseRef(httpRefMgt, chanId);
}

View File

@ -54,6 +54,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnod
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy")
TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit")
//common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")
@ -96,6 +97,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space")
TAOS_DEFINE_ERROR(TSDB_CODE_THIRDPARTY_ERROR, "third party error, please check the log")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up")
TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")