diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index f6f1f7f027..c3f107028a 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -25,8 +25,13 @@ extern "C" { typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; +int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId); + int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +int64_t taosInitHttpChan(); +void taosDestroyHttpChan(int64_t chanId); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 7ac22ac40f..108ae4223b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -87,6 +87,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) #define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) +#define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025) @@ -136,6 +137,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E) +#define TSDB_CODE_THIRDPARTY_ERROR TAOS_DEF_ERROR_CODE(0, 0x012F) #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index c4ca39c323..704c4b06e2 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -19,7 +19,6 @@ #include "zlib.h" #include "thttp.h" #include "taoserror.h" -#include "tlog.h" #include "transComm.h" // clang-format on @@ -27,13 +26,16 @@ #define HTTP_RECV_BUF_SIZE 1024 static int32_t httpRefMgt = 0; -static int64_t httpRef = -1; static int32_t FAST_FAILURE_LIMIT = 1; + +static int64_t httpDefaultChanId = -1; + typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; TdThread thread; SHashObj* connStatusTable; + SHashObj* connPool; int8_t quit; } SHttpModule; @@ -46,7 +48,7 @@ typedef struct SHttpMsg { int32_t len; EHttpCompFlag flag; int8_t quit; - + int64_t chanId; } SHttpMsg; typedef struct SHttpClient { @@ -58,19 +60,33 @@ typedef struct SHttpClient { char* addr; uint16_t port; struct sockaddr_in dest; + int64_t chanId; } SHttpClient; +typedef struct SHttpConnList { + queue q; + +} SHttpConnList; + static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; static void transHttpEnvInit(); static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); -static int32_t httpSendQuit(); +static int32_t httpSendQuit(int64_t chanId); + +static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId); +static void httpDestroyMsg(SHttpMsg* msg); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ); static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +static void httpModuleDestroy(SHttpModule* http); + +static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, + int32_t contLen, EHttpCompFlag flag, int64_t chanId); static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { @@ -91,7 +107,7 @@ static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t uri, server, contLen); } else { terrno = TSDB_CODE_INVALID_CFG; - return -1; + return terrno; } } @@ -158,6 +174,8 @@ _OVER: if (code == 0) { memcpy(pSrc, pDest, gzipStream.total_out); code = gzipStream.total_out; + } else { + code = terrno; } taosMemoryFree(pDest); @@ -168,11 +186,15 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { tError("http-report failed to resolving domain names: %s", server); - return -1; + return TSDB_CODE_RPC_FQDN_ERROR; } - char buf[128] = {0}; + char buf[256] = {0}; tinet_ntoa(buf, ip); - uv_ip4_addr(buf, port, dest); + int ret = uv_ip4_addr(buf, port, dest); + if (ret != 0) { + tError("http-report failed to get addr %s", uv_err_name(ret)); + return TSDB_CODE_THIRDPARTY_ERROR; + } return 0; } @@ -183,6 +205,43 @@ static void* httpThread(void* arg) { return NULL; } +static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId) { + if (server == NULL || uri == NULL) { + tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + + if (pCont == NULL || contLen == 0) { + tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + + SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); + if (msg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + msg->port = port; + msg->server = taosStrdup(server); + msg->uri = taosStrdup(uri); + msg->cont = taosMemoryMalloc(contLen); + if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) { + httpDestroyMsg(msg); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + memcpy(msg->cont, pCont, contLen); + msg->len = contLen; + msg->flag = flag; + msg->quit = 0; + msg->chanId = chanId; + return msg; +} static void httpDestroyMsg(SHttpMsg* msg) { if (msg == NULL) return; @@ -191,15 +250,7 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } -static void httpDestroyMsgWrapper(void* cont, void* param) { - httpDestroyMsg((SHttpMsg*)cont); - // if (msg == NULL) return; - - // taosMemoryFree(msg->server); - // taosMemoryFree(msg->uri); - // taosMemoryFree(msg->cont); - // taosMemoryFree(msg); -} +static void httpDestroyMsgWrapper(void* cont, void* param) { httpDestroyMsg((SHttpMsg*)cont); } static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { SHttpMsg *msg = NULL, *quitMsg = NULL; @@ -278,7 +329,7 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { SHttpClient* cli = handle->data; if (nread < 0) { - tError("http-report recv error:%s", uv_err_name(nread)); + tError("http-report recv error:%s", uv_strerror(nread)); } else { tTrace("http-report succ to recv %d bytes", (int32_t)nread); } @@ -289,92 +340,77 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const static void clientSentCb(uv_write_t* req, int32_t status) { SHttpClient* cli = req->data; if (status != 0) { - tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, + cli->port, cli->chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } return; } else { - tTrace("http-report succ to send data"); + tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId); } status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); if (status != 0) { - tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, + cli->port, cli->chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } } } static void clientConnCb(uv_connect_t* req, int32_t status) { - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); SHttpClient* cli = req->data; + int64_t chanId = cli->chanId; + + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (status != 0) { httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); - tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), + cli->addr, cli->port, chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); return; } httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1); status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { - tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, + cli->port, chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } } - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); } -int32_t httpSendQuit() { - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); - if (http == NULL) return 0; +int32_t httpSendQuit(int64_t chanId) { + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); + if (http == NULL) return terrno; SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg)); + if (msg == NULL) { + taosReleaseRef(httpRefMgt, chanId); + return TSDB_CODE_OUT_OF_MEMORY; + } + msg->quit = 1; + msg->chanId = chanId; - transAsyncSend(http->asyncPool, &(msg->q)); - taosReleaseRef(httpRefMgt, httpRef); + int ret = transAsyncSend(http->asyncPool, &(msg->q)); + if (ret != 0) { + taosMemoryFree(msg); + taosReleaseRef(httpRefMgt, chanId); + return ret; + } + + taosReleaseRef(httpRefMgt, chanId); return 0; } -static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag) { - if (server == NULL || uri == NULL) { - tError("http-report failed to report to invalid addr"); - return -1; - } - - if (pCont == NULL || contLen == 0) { - tError("http-report failed to report empty packet"); - return -1; - } - SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); - if (load == NULL) { - tError("http-report already released"); - return -1; - } - - SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); - - msg->server = taosStrdup(server); - msg->uri = taosStrdup(uri); - msg->port = port; - msg->cont = taosMemoryMalloc(contLen); - memcpy(msg->cont, pCont, contLen); - msg->len = contLen; - msg->flag = flag; - msg->quit = 0; - - int ret = transAsyncSend(load->asyncPool, &(msg->q)); - taosReleaseRef(httpRefMgt, httpRef); - return ret; -} - static void httpDestroyClientCb(uv_handle_t* handle) { SHttpClient* http = handle->data; destroyHttpClient(http); @@ -392,13 +428,14 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) { return; } static void httpHandleQuit(SHttpMsg* msg) { + int64_t chanId = msg->chanId; taosMemoryFree(msg); - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + tDebug("http-report receive quit, chanId:%" PRId64 "", chanId); + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (http == NULL) return; - uv_walk(http->loop, httpWalkCb, NULL); - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); } static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) { @@ -431,8 +468,12 @@ static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, return; } static void httpHandleReq(SHttpMsg* msg) { - int32_t ignore = false; - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + int64_t chanId = msg->chanId; + int32_t ignore = false; + char* header = NULL; + terrno = 0; + + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (http == NULL) { goto END; } @@ -457,119 +498,250 @@ static void httpHandleReq(SHttpMsg* msg) { } } - int32_t len = 2048; - char* header = taosMemoryCalloc(1, len); - int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, len, msg->flag); - if (headLen < 0) { - taosMemoryFree(header); + int32_t cap = 2048; + header = taosMemoryCalloc(1, cap); + if (header == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + + int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag); + if (headLen < 0 || headLen >= cap) { goto END; } uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); + if (wb == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); + if (cli == NULL) { + taosMemoryFree(wb); + goto END; + } + cli->conn.data = cli; cli->tcp.data = cli; cli->req.data = cli; cli->wbuf = wb; cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); + if (cli->rbuf == NULL) { + taosMemoryFree(msg->uri); + taosMemoryFree(msg); + destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, chanId); + return; + } + cli->addr = msg->server; cli->port = msg->port; cli->dest = dest; + cli->chanId = chanId; taosMemoryFree(msg->uri); taosMemoryFree(msg); - uv_tcp_init(http->loop, &cli->tcp); + int err = uv_tcp_init(http->loop, &cli->tcp); + if (err != 0) { + tError("http-report failed to init socket handle, dst:%s:%d,reason:%s chanId:%" PRId64 "", cli->addr, cli->port, + uv_strerror(err), chanId); + destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, chanId); + } // set up timeout to avoid stuck; int32_t fd = taosCreateSocketWithTimeout(5000); if (fd < 0) { - tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port); + tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 "", cli->addr, cli->port, chanId); destroyHttpClient(cli); - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); return; } int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { - tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); - taosReleaseRef(httpRefMgt, httpRef); + tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(ret), cli->addr, + cli->port, chanId); destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, chanId); return; } ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); if (ret != 0) { - tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, - cli->port); + tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(ret), + cli->addr, cli->port, chanId); httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); destroyHttpClient(cli); } - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); return; END: if (ignore == false) { - tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + tError("http-report failed to report, reason: %s, addr: %s:%d, chanId:%" PRId64 "", terrstr(), msg->server, + msg->port, chanId); } httpDestroyMsg(msg); - taosReleaseRef(httpRefMgt, httpRef); + taosMemoryFree(header); + taosReleaseRef(httpRefMgt, chanId); +} + +static void httpModuleDestroy(SHttpModule* http) { + if (http->asyncPool != NULL) { + TRANS_DESTROY_ASYNC_POOL_MSG(http->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL); + transAsyncPoolDestroy(http->asyncPool); + } + if (http->loop) { + uv_loop_close(http->loop); + taosMemoryFree(http->loop); + } + taosHashCleanup(http->connStatusTable); + // not free http, http freeed by ref +} + +static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, + int32_t contLen, EHttpCompFlag flag, int64_t chanId) { + int32_t ret = 0; + terrno = 0; + SHttpMsg* msg = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId); + if (msg == NULL) { + return terrno; + } + + SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); + if (load == NULL) { + httpDestroyMsg(msg); + return terrno; + } + + if (atomic_load_8(&load->quit)) { + httpDestroyMsg(msg); + taosReleaseRef(httpRefMgt, chanId); + terrno = TSDB_CODE_HTTP_MODULE_QUIT; + return terrno; + } + + ret = transAsyncSend(load->asyncPool, &(msg->q)); + if (ret < 0) { + httpDestroyMsg(msg); + taosReleaseRef(httpRefMgt, chanId); + terrno = TSDB_CODE_HTTP_MODULE_QUIT; + return terrno; + } + + taosReleaseRef(httpRefMgt, chanId); + return ret; +} + +int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId) { + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId); } int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { taosThreadOnce(&transHttpInit, transHttpEnvInit); - return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag); + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId); } static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } + +int64_t transInitHttpChanImpl(); + static void transHttpEnvInit() { - httpRefMgt = taosOpenRef(1, transHttpDestroyHandle); - - SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); - http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); - http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - http->quit = 0; - - uv_loop_init(http->loop); - - http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); - if (NULL == http->asyncPool) { - taosMemoryFree(http->loop); - taosMemoryFree(http); - http = NULL; - return; - } - - int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); - if (err != 0) { - taosMemoryFree(http->loop); - taosMemoryFree(http); - http = NULL; - } - httpRef = taosAddRef(httpRefMgt, http); + httpRefMgt = taosOpenRef(64, transHttpDestroyHandle); + httpDefaultChanId = transInitHttpChanImpl(); } void transHttpEnvDestroy() { - // remove http - if (httpRef == -1) { + // remove default chanId + taosDestroyHttpChan(httpDefaultChanId); + httpDefaultChanId = -1; +} + +int64_t transInitHttpChanImpl() { + SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); + if (http == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (http->connStatusTable == NULL) { + httpModuleDestroy(http); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + if (http->loop == NULL) { + httpModuleDestroy(http); + taosMemoryFree(http); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + int err = uv_loop_init(http->loop); + if (err != 0) { + tError("http-report failed init uv, reason:%s", uv_strerror(err)); + httpModuleDestroy(http); + taosMemoryFree(http); + return TSDB_CODE_THIRDPARTY_ERROR; + } + + http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); + if (NULL == http->asyncPool) { + httpModuleDestroy(http); + taosMemoryFree(http); + return terrno; + } + http->quit = 0; + + err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); + if (err != 0) { + httpModuleDestroy(http); + taosMemoryFree(http); + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; + } + int64_t ref = taosAddRef(httpRefMgt, http); + if (ref < 0) { + return terrno; + } + return ref; +} +int64_t taosInitHttpChan() { + taosThreadOnce(&transHttpInit, transHttpEnvInit); + return transInitHttpChanImpl(); +} + +void taosDestroyHttpChan(int64_t chanId) { + tDebug("http-report send quit, chanId:%" PRId64 "", chanId); + + int ret = 0; + SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); + if (load == NULL) { + tError("http-report failed destroy chanId %" PRId64 "", chanId); return; } - SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); - atomic_store_8(&load->quit, 1); - httpSendQuit(); + + ret = httpSendQuit(chanId); + if (ret != 0) { + tDebug("http-report already destroyed, chanId %" PRId64 "", chanId); + taosReleaseRef(httpRefMgt, chanId); + return; + } + taosThreadJoin(load->thread, NULL); - TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL); - transAsyncPoolDestroy(load->asyncPool); - uv_loop_close(load->loop); - taosMemoryFree(load->loop); + httpModuleDestroy(load); - taosHashCleanup(load->connStatusTable); - - taosReleaseRef(httpRefMgt, httpRef); - taosRemoveRef(httpRefMgt, httpRef); -} + taosReleaseRef(httpRefMgt, chanId); + taosReleaseRef(httpRefMgt, chanId); +} \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e9fa58e6e3..e46703b88c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -54,6 +54,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnod TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") +TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") @@ -96,6 +97,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space") +TAOS_DEFINE_ERROR(TSDB_CODE_THIRDPARTY_ERROR, "third party error, please check the log") + TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down")