diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index f6f1f7f027..a2f6b5ac8b 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -28,6 +28,11 @@ typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +int64_t taosInitHttpChan(); +int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId); +void taosDestroyHttpChan(int64_t chanId); + #ifdef __cplusplus } #endif diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 24f9d041fc..64b9ec7aeb 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -89,6 +89,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) #define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) +#define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index c4ca39c323..ba12774c18 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -19,7 +19,6 @@ #include "zlib.h" #include "thttp.h" #include "taoserror.h" -#include "tlog.h" #include "transComm.h" // clang-format on @@ -27,14 +26,18 @@ #define HTTP_RECV_BUF_SIZE 1024 static int32_t httpRefMgt = 0; -static int64_t httpRef = -1; static int32_t FAST_FAILURE_LIMIT = 1; + +static int64_t httpDefaultChanId = -1; + typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; TdThread thread; SHashObj* connStatusTable; + SHashObj* connPool; int8_t quit; + int16_t connNum; } SHttpModule; typedef struct SHttpMsg { @@ -46,7 +49,7 @@ typedef struct SHttpMsg { int32_t len; EHttpCompFlag flag; int8_t quit; - + int64_t chanId; } SHttpMsg; typedef struct SHttpClient { @@ -58,50 +61,73 @@ typedef struct SHttpClient { char* addr; uint16_t port; struct sockaddr_in dest; + int64_t chanId; } SHttpClient; +typedef struct SHttpConnList { + queue q; + +} SHttpConnList; + static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; static void transHttpEnvInit(); static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); -static int32_t httpSendQuit(); +static int32_t httpSendQuit(SHttpModule* http, int64_t chanId); + +static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg); +static void httpDestroyMsg(SHttpMsg* msg); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ); static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +static void httpModuleDestroy(SHttpModule* http); + +static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, + int32_t contLen, EHttpCompFlag flag, int64_t chanId); static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, - EHttpCompFlag flag) { + + EHttpCompFlag flag) { + int32_t code = 0; + int32_t len = 0; if (flag == HTTP_FLAT) { - return snprintf(pHead, headLen, + len = snprintf(pHead, headLen, "POST %s HTTP/1.1\n" "Host: %s\n" "Content-Type: application/json\n" "Content-Length: %d\n\n", uri, server, contLen); + if (len < 0 || len >= headLen) { + code = TSDB_CODE_OUT_OF_RANGE; + } } else if (flag == HTTP_GZIP) { - return snprintf(pHead, headLen, + len = snprintf(pHead, headLen, "POST %s HTTP/1.1\n" "Host: %s\n" "Content-Type: application/json\n" "Content-Encoding: gzip\n" "Content-Length: %d\n\n", uri, server, contLen); + if (len < 0 || len >= headLen) { + code = TSDB_CODE_OUT_OF_RANGE; + } } else { - terrno = TSDB_CODE_INVALID_CFG; - return -1; + code = TSDB_CODE_INVALID_PARA; } + return code; } static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { - int32_t code = -1; + int32_t code = 0; int32_t destLen = srcLen; void* pDest = taosMemoryMalloc(destLen); if (pDest == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code= TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } @@ -110,7 +136,7 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { gzipStream.zfree = (free_func)0; gzipStream.opaque = (voidpf)0; if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } @@ -121,13 +147,13 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) { if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } } if (gzipStream.avail_in != 0) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } @@ -137,18 +163,18 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { break; } if (err != Z_OK) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } } if (deflateEnd(&gzipStream) != Z_OK) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } if (gzipStream.total_out >= srcLen) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } @@ -158,8 +184,7 @@ _OVER: if (code == 0) { memcpy(pSrc, pDest, gzipStream.total_out); code = gzipStream.total_out; - } - + } taosMemoryFree(pDest); return code; } @@ -168,11 +193,15 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { tError("http-report failed to resolving domain names: %s", server); - return -1; + return TSDB_CODE_RPC_FQDN_ERROR; } - char buf[128] = {0}; + char buf[256] = {0}; tinet_ntoa(buf, ip); - uv_ip4_addr(buf, port, dest); + int ret = uv_ip4_addr(buf, port, dest); + if (ret != 0) { + tError("http-report failed to get addr %s", uv_err_name(ret)); + return TSDB_CODE_THIRDPARTY_ERROR; + } return 0; } @@ -183,6 +212,44 @@ static void* httpThread(void* arg) { return NULL; } +static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) { + if (server == NULL || uri == NULL) { + tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId); + *httpMsg = NULL; + return TSDB_CODE_INVALID_PARA; + } + + if (pCont == NULL || contLen == 0) { + tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId); + *httpMsg = NULL; + return TSDB_CODE_INVALID_PARA; + } + + SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); + if (msg == NULL) { + *httpMsg = NULL; + return TSDB_CODE_OUT_OF_MEMORY; + } + + msg->port = port; + msg->server = taosStrdup(server); + msg->uri = taosStrdup(uri); + msg->cont = taosMemoryMalloc(contLen); + if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) { + httpDestroyMsg(msg); + *httpMsg = NULL; + return TSDB_CODE_OUT_OF_MEMORY; + } + + memcpy(msg->cont, pCont, contLen); + msg->len = contLen; + msg->flag = flag; + msg->quit = 0; + msg->chanId = chanId; + *httpMsg = msg; + return 0; +} static void httpDestroyMsg(SHttpMsg* msg) { if (msg == NULL) return; @@ -191,15 +258,7 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } -static void httpDestroyMsgWrapper(void* cont, void* param) { - httpDestroyMsg((SHttpMsg*)cont); - // if (msg == NULL) return; - - // taosMemoryFree(msg->server); - // taosMemoryFree(msg->uri); - // taosMemoryFree(msg->cont); - // taosMemoryFree(msg); -} +static void httpDestroyMsgWrapper(void* cont, void* param) { httpDestroyMsg((SHttpMsg*)cont); } static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { SHttpMsg *msg = NULL, *quitMsg = NULL; @@ -221,6 +280,7 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { QUEUE_PUSH(&item->qmsg, &quitMsg->q); } } + static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SHttpModule* http = item->pThrd; @@ -266,6 +326,14 @@ static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) { SHttpClient* cli = handle->data; + + int64_t chanId = cli->chanId; + SHttpModule* http = taosAcquireRef(httpRefMgt, cli->chanId); + if (http != NULL) { + http->connNum -= 1; + taosReleaseRef(httpRefMgt, chanId); + } + destroyHttpClient(cli); } @@ -278,7 +346,7 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { SHttpClient* cli = handle->data; if (nread < 0) { - tError("http-report recv error:%s", uv_err_name(nread)); + tError("http-report recv error:%s", uv_strerror(nread)); } else { tTrace("http-report succ to recv %d bytes", (int32_t)nread); } @@ -289,92 +357,73 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const static void clientSentCb(uv_write_t* req, int32_t status) { SHttpClient* cli = req->data; if (status != 0) { - tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, + cli->port, cli->chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } return; } else { - tTrace("http-report succ to send data"); + tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId); } status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); if (status != 0) { - tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, + cli->port, cli->chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } } } static void clientConnCb(uv_connect_t* req, int32_t status) { - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); SHttpClient* cli = req->data; + int64_t chanId = cli->chanId; + + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (status != 0) { httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); - tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), + cli->addr, cli->port, chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); return; } + http->connNum += 1; + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1); status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { - tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, + cli->port, chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } } - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); } -int32_t httpSendQuit() { - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); - if (http == NULL) return 0; - +int32_t httpSendQuit(SHttpModule* http, int64_t chanId) { SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg)); + if (msg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + msg->quit = 1; + msg->chanId = chanId; + + int ret = transAsyncSend(http->asyncPool, &(msg->q)); + if (ret != 0) { + taosMemoryFree(msg); + return TSDB_CODE_THIRDPARTY_ERROR; + } - transAsyncSend(http->asyncPool, &(msg->q)); - taosReleaseRef(httpRefMgt, httpRef); return 0; } -static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag) { - if (server == NULL || uri == NULL) { - tError("http-report failed to report to invalid addr"); - return -1; - } - - if (pCont == NULL || contLen == 0) { - tError("http-report failed to report empty packet"); - return -1; - } - SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); - if (load == NULL) { - tError("http-report already released"); - return -1; - } - - SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); - - msg->server = taosStrdup(server); - msg->uri = taosStrdup(uri); - msg->port = port; - msg->cont = taosMemoryMalloc(contLen); - memcpy(msg->cont, pCont, contLen); - msg->len = contLen; - msg->flag = flag; - msg->quit = 0; - - int ret = transAsyncSend(load->asyncPool, &(msg->q)); - taosReleaseRef(httpRefMgt, httpRef); - return ret; -} - static void httpDestroyClientCb(uv_handle_t* handle) { SHttpClient* http = handle->data; destroyHttpClient(http); @@ -392,13 +441,14 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) { return; } static void httpHandleQuit(SHttpMsg* msg) { + int64_t chanId = msg->chanId; taosMemoryFree(msg); - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + tDebug("http-report receive quit, chanId:%" PRId64 "", chanId); + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (http == NULL) return; - uv_walk(http->loop, httpWalkCb, NULL); - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); } static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) { @@ -431,9 +481,14 @@ static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, return; } static void httpHandleReq(SHttpMsg* msg) { - int32_t ignore = false; - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + int64_t chanId = msg->chanId; + int32_t ignore = false; + char* header = NULL; + int32_t code = 0; + + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (http == NULL) { + code = terrno; goto END; } if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) { @@ -453,123 +508,268 @@ static void httpHandleReq(SHttpMsg* msg) { msg->flag = HTTP_FLAT; } if (dstLen < 0) { + code = dstLen; goto END; } } - int32_t len = 2048; - char* header = taosMemoryCalloc(1, len); - int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, len, msg->flag); + int32_t cap = 2048; + header = taosMemoryCalloc(1, cap); + if (header == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + + int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag); if (headLen < 0) { - taosMemoryFree(header); + code = headLen; goto END; } uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); + if (wb == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); + if (cli == NULL) { + taosMemoryFree(wb); + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + cli->conn.data = cli; cli->tcp.data = cli; cli->req.data = cli; - cli->wbuf = wb; - cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); + cli->dest = dest; + cli->chanId = chanId; cli->addr = msg->server; cli->port = msg->port; - cli->dest = dest; - taosMemoryFree(msg->uri); taosMemoryFree(msg); - uv_tcp_init(http->loop, &cli->tcp); + cli->wbuf = wb; + cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); + if (cli->rbuf == NULL) { + tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, + chanId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, chanId); + return; + } + + int err = uv_tcp_init(http->loop, &cli->tcp); + if (err != 0) { + tError("http-report failed to init socket handle, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, + chanId, uv_strerror(err)); + destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, chanId); + return; + } // set up timeout to avoid stuck; int32_t fd = taosCreateSocketWithTimeout(5000); if (fd < 0) { - tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port); + tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, chanId, + tstrerror(TAOS_SYSTEM_ERROR(errno))); destroyHttpClient(cli); - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); return; } + int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { - tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); - taosReleaseRef(httpRefMgt, httpRef); + tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 ",reason:%s", uv_strerror(ret), + cli->addr, cli->port, chanId, uv_strerror(ret)); destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, chanId); return; } ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); if (ret != 0) { - tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, - cli->port); + tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ",reson:%s", cli->addr, cli->port, + chanId, uv_strerror(ret)); httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); destroyHttpClient(cli); } - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); return; END: if (ignore == false) { - tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId, + tstrerror(code)); } httpDestroyMsg(msg); - taosReleaseRef(httpRefMgt, httpRef); + taosMemoryFree(header); + taosReleaseRef(httpRefMgt, chanId); +} + +static void httpModuleDestroy(SHttpModule* http) { + if (http == NULL) return; + + if (http->asyncPool != NULL) { + TRANS_DESTROY_ASYNC_POOL_MSG(http->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL); + transAsyncPoolDestroy(http->asyncPool); + } + if (http->loop) { + uv_loop_close(http->loop); + taosMemoryFree(http->loop); + } + + taosHashCleanup(http->connStatusTable); + // not free http, http freeed by ref +} + +void httpModuleDestroy2(SHttpModule* http) { + if (http == NULL) return; + httpModuleDestroy(http); + taosMemoryFree(http); +} + +static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, + int32_t contLen, EHttpCompFlag flag, int64_t chanId) { + SHttpModule* load = NULL; + SHttpMsg *msg = NULL; + int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId,&msg); + if (code != 0) { + goto _ERROR; + } + + load = taosAcquireRef(httpRefMgt, chanId); + if (load == NULL) { + code = terrno; + goto _ERROR; + } + + if (atomic_load_8(&load->quit)) { + code = TSDB_CODE_HTTP_MODULE_QUIT; + goto _ERROR; + } + + code = transAsyncSend(load->asyncPool, &(msg->q)); + if (code != 0) { + code = TSDB_CODE_HTTP_MODULE_QUIT; + goto _ERROR; + } + msg = NULL; + +_ERROR: + httpDestroyMsg(msg); + if (load != NULL) taosReleaseRef(httpRefMgt, chanId); + return code; +} + +int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId) { + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId); } int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { taosThreadOnce(&transHttpInit, transHttpEnvInit); - return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag); + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId); } static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } + +int64_t transInitHttpChanImpl(); + static void transHttpEnvInit() { - httpRefMgt = taosOpenRef(1, transHttpDestroyHandle); - - SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); - http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); - http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - http->quit = 0; - - uv_loop_init(http->loop); - - http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); - if (NULL == http->asyncPool) { - taosMemoryFree(http->loop); - taosMemoryFree(http); - http = NULL; - return; - } - - int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); - if (err != 0) { - taosMemoryFree(http->loop); - taosMemoryFree(http); - http = NULL; - } - httpRef = taosAddRef(httpRefMgt, http); + httpRefMgt = taosOpenRef(64, transHttpDestroyHandle); + httpDefaultChanId = transInitHttpChanImpl(); } void transHttpEnvDestroy() { - // remove http - if (httpRef == -1) { + // remove default chanId + taosDestroyHttpChan(httpDefaultChanId); + httpDefaultChanId = -1; +} + +int64_t transInitHttpChanImpl() { + int32_t code = 0; + SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); + if (http == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } + + http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (http->connStatusTable == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } + + http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + if (http->loop == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } + + int err = uv_loop_init(http->loop); + if (err != 0) { + tError("http-report failed init uv, reason:%s", uv_strerror(err)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto _ERROR; + } + + http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); + if (http->asyncPool == NULL) { + code = terrno; + goto _ERROR; + } + + http->quit = 0; + + err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); + if (err != 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _ERROR; + } + + int64_t ref = taosAddRef(httpRefMgt, http); + if (ref < 0) { + goto _ERROR; + } + return ref; + +_ERROR: + httpModuleDestroy2(http); + return code; +} +int64_t taosInitHttpChan() { + taosThreadOnce(&transHttpInit, transHttpEnvInit); + return transInitHttpChanImpl(); +} + +void taosDestroyHttpChan(int64_t chanId) { + tDebug("http-report send quit, chanId:%" PRId64 "", chanId); + + int ret = 0; + SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); + if (load == NULL) { + tError("http-report failed to destroy chanId %" PRId64 ", reason:%s", chanId, tstrerror(terrno)); + ret = terrno; return; } - SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); atomic_store_8(&load->quit, 1); - httpSendQuit(); + ret = httpSendQuit(load, chanId); + if (ret != 0) { + tDebug("http-report already destroyed, chanId %" PRId64 ",reason:%s", chanId, tstrerror(ret)); + taosReleaseRef(httpRefMgt, chanId); + return; + } + taosThreadJoin(load->thread, NULL); - TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL); - transAsyncPoolDestroy(load->asyncPool); - uv_loop_close(load->loop); - taosMemoryFree(load->loop); + httpModuleDestroy(load); - taosHashCleanup(load->connStatusTable); - - taosReleaseRef(httpRefMgt, httpRef); - taosRemoveRef(httpRefMgt, httpRef); -} + taosReleaseRef(httpRefMgt, chanId); + taosRemoveRef(httpRefMgt, chanId); +} \ No newline at end of file diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 68ff5dc5e5..b9223e7b39 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -224,14 +224,28 @@ int transSetConnOption(uv_tcp_t* stream, int keepalive) { SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); + if (pool == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); + if (pool->asyncs == NULL) { + taosMemoryFree(pool); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } int i = 0, err = 0; for (i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem)); + if (item == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + break; + } item->pThrd = arg; QUEUE_INIT(&item->qmsg); taosThreadMutexInit(&item->mtx, NULL); @@ -240,6 +254,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) err = uv_async_init(loop, async, cb); if (err != 0) { tError("failed to init async, reason:%s", uv_err_name(err)); + terrno = TSDB_CODE_THIRDPARTY_ERROR; break; } } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c7fd6f65c5..0f5ad73f61 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -56,6 +56,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnod TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") +TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")