From 1c4f354adfa2652b7e4822545d48569dc8a9c23f Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 8 Jul 2024 02:32:55 +0000 Subject: [PATCH 01/13] add http interface --- include/libs/transport/thttp.h | 5 + include/util/taoserror.h | 2 + source/libs/transport/src/thttp.c | 418 +++++++++++++++++++++--------- source/util/src/terror.c | 3 + 4 files changed, 305 insertions(+), 123 deletions(-) diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index f6f1f7f027..c3f107028a 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -25,8 +25,13 @@ extern "C" { typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; +int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId); + int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +int64_t taosInitHttpChan(); +void taosDestroyHttpChan(int64_t chanId); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 7ac22ac40f..108ae4223b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -87,6 +87,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) #define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) +#define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025) @@ -136,6 +137,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) #define TSDB_CODE_MSG_ENCODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x012D) #define TSDB_CODE_NO_ENOUGH_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012E) +#define TSDB_CODE_THIRDPARTY_ERROR TAOS_DEF_ERROR_CODE(0, 0x012F) #define TSDB_CODE_APP_IS_STARTING TAOS_DEF_ERROR_CODE(0, 0x0130) #define TSDB_CODE_APP_IS_STOPPING TAOS_DEF_ERROR_CODE(0, 0x0131) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index c4ca39c323..704c4b06e2 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -19,7 +19,6 @@ #include "zlib.h" #include "thttp.h" #include "taoserror.h" -#include "tlog.h" #include "transComm.h" // clang-format on @@ -27,13 +26,16 @@ #define HTTP_RECV_BUF_SIZE 1024 static int32_t httpRefMgt = 0; -static int64_t httpRef = -1; static int32_t FAST_FAILURE_LIMIT = 1; + +static int64_t httpDefaultChanId = -1; + typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; TdThread thread; SHashObj* connStatusTable; + SHashObj* connPool; int8_t quit; } SHttpModule; @@ -46,7 +48,7 @@ typedef struct SHttpMsg { int32_t len; EHttpCompFlag flag; int8_t quit; - + int64_t chanId; } SHttpMsg; typedef struct SHttpClient { @@ -58,19 +60,33 @@ typedef struct SHttpClient { char* addr; uint16_t port; struct sockaddr_in dest; + int64_t chanId; } SHttpClient; +typedef struct SHttpConnList { + queue q; + +} SHttpConnList; + static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; static void transHttpEnvInit(); static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); -static int32_t httpSendQuit(); +static int32_t httpSendQuit(int64_t chanId); + +static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId); +static void httpDestroyMsg(SHttpMsg* msg); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ); static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +static void httpModuleDestroy(SHttpModule* http); + +static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, + int32_t contLen, EHttpCompFlag flag, int64_t chanId); static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { @@ -91,7 +107,7 @@ static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t uri, server, contLen); } else { terrno = TSDB_CODE_INVALID_CFG; - return -1; + return terrno; } } @@ -158,6 +174,8 @@ _OVER: if (code == 0) { memcpy(pSrc, pDest, gzipStream.total_out); code = gzipStream.total_out; + } else { + code = terrno; } taosMemoryFree(pDest); @@ -168,11 +186,15 @@ static FORCE_INLINE int32_t taosBuildDstAddr(const char* server, uint16_t port, uint32_t ip = taosGetIpv4FromFqdn(server); if (ip == 0xffffffff) { tError("http-report failed to resolving domain names: %s", server); - return -1; + return TSDB_CODE_RPC_FQDN_ERROR; } - char buf[128] = {0}; + char buf[256] = {0}; tinet_ntoa(buf, ip); - uv_ip4_addr(buf, port, dest); + int ret = uv_ip4_addr(buf, port, dest); + if (ret != 0) { + tError("http-report failed to get addr %s", uv_err_name(ret)); + return TSDB_CODE_THIRDPARTY_ERROR; + } return 0; } @@ -183,6 +205,43 @@ static void* httpThread(void* arg) { return NULL; } +static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId) { + if (server == NULL || uri == NULL) { + tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + + if (pCont == NULL || contLen == 0) { + tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + + SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); + if (msg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + msg->port = port; + msg->server = taosStrdup(server); + msg->uri = taosStrdup(uri); + msg->cont = taosMemoryMalloc(contLen); + if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) { + httpDestroyMsg(msg); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + memcpy(msg->cont, pCont, contLen); + msg->len = contLen; + msg->flag = flag; + msg->quit = 0; + msg->chanId = chanId; + return msg; +} static void httpDestroyMsg(SHttpMsg* msg) { if (msg == NULL) return; @@ -191,15 +250,7 @@ static void httpDestroyMsg(SHttpMsg* msg) { taosMemoryFree(msg->cont); taosMemoryFree(msg); } -static void httpDestroyMsgWrapper(void* cont, void* param) { - httpDestroyMsg((SHttpMsg*)cont); - // if (msg == NULL) return; - - // taosMemoryFree(msg->server); - // taosMemoryFree(msg->uri); - // taosMemoryFree(msg->cont); - // taosMemoryFree(msg); -} +static void httpDestroyMsgWrapper(void* cont, void* param) { httpDestroyMsg((SHttpMsg*)cont); } static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { SHttpMsg *msg = NULL, *quitMsg = NULL; @@ -278,7 +329,7 @@ static FORCE_INLINE void clientAllocBuffCb(uv_handle_t* handle, size_t suggested static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { SHttpClient* cli = handle->data; if (nread < 0) { - tError("http-report recv error:%s", uv_err_name(nread)); + tError("http-report recv error:%s", uv_strerror(nread)); } else { tTrace("http-report succ to recv %d bytes", (int32_t)nread); } @@ -289,92 +340,77 @@ static FORCE_INLINE void clientRecvCb(uv_stream_t* handle, ssize_t nread, const static void clientSentCb(uv_write_t* req, int32_t status) { SHttpClient* cli = req->data; if (status != 0) { - tError("http-report failed to send data, reason: %s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to send data, reason: %s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, + cli->port, cli->chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } return; } else { - tTrace("http-report succ to send data"); + tTrace("http-report succ to send data, chanId:%" PRId64 "", cli->chanId); } status = uv_read_start((uv_stream_t*)&cli->tcp, clientAllocBuffCb, clientRecvCb); if (status != 0) { - tError("http-report failed to recv data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to recv data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, + cli->port, cli->chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } } } static void clientConnCb(uv_connect_t* req, int32_t status) { - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); SHttpClient* cli = req->data; + int64_t chanId = cli->chanId; + + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (status != 0) { httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); - tError("http-report failed to conn to server, reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to conn to server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), + cli->addr, cli->port, chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); return; } httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1); status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); if (0 != status) { - tError("http-report failed to send data,reason:%s, dst:%s:%d", uv_strerror(status), cli->addr, cli->port); + tError("http-report failed to send data,reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(status), cli->addr, + cli->port, chanId); if (!uv_is_closing((uv_handle_t*)&cli->tcp)) { uv_close((uv_handle_t*)&cli->tcp, clientCloseCb); } } - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); } -int32_t httpSendQuit() { - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); - if (http == NULL) return 0; +int32_t httpSendQuit(int64_t chanId) { + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); + if (http == NULL) return terrno; SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg)); + if (msg == NULL) { + taosReleaseRef(httpRefMgt, chanId); + return TSDB_CODE_OUT_OF_MEMORY; + } + msg->quit = 1; + msg->chanId = chanId; - transAsyncSend(http->asyncPool, &(msg->q)); - taosReleaseRef(httpRefMgt, httpRef); + int ret = transAsyncSend(http->asyncPool, &(msg->q)); + if (ret != 0) { + taosMemoryFree(msg); + taosReleaseRef(httpRefMgt, chanId); + return ret; + } + + taosReleaseRef(httpRefMgt, chanId); return 0; } -static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag) { - if (server == NULL || uri == NULL) { - tError("http-report failed to report to invalid addr"); - return -1; - } - - if (pCont == NULL || contLen == 0) { - tError("http-report failed to report empty packet"); - return -1; - } - SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); - if (load == NULL) { - tError("http-report already released"); - return -1; - } - - SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); - - msg->server = taosStrdup(server); - msg->uri = taosStrdup(uri); - msg->port = port; - msg->cont = taosMemoryMalloc(contLen); - memcpy(msg->cont, pCont, contLen); - msg->len = contLen; - msg->flag = flag; - msg->quit = 0; - - int ret = transAsyncSend(load->asyncPool, &(msg->q)); - taosReleaseRef(httpRefMgt, httpRef); - return ret; -} - static void httpDestroyClientCb(uv_handle_t* handle) { SHttpClient* http = handle->data; destroyHttpClient(http); @@ -392,13 +428,14 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) { return; } static void httpHandleQuit(SHttpMsg* msg) { + int64_t chanId = msg->chanId; taosMemoryFree(msg); - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + tDebug("http-report receive quit, chanId:%" PRId64 "", chanId); + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (http == NULL) return; - uv_walk(http->loop, httpWalkCb, NULL); - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); } static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port) { @@ -431,8 +468,12 @@ static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, return; } static void httpHandleReq(SHttpMsg* msg) { - int32_t ignore = false; - SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + int64_t chanId = msg->chanId; + int32_t ignore = false; + char* header = NULL; + terrno = 0; + + SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (http == NULL) { goto END; } @@ -457,119 +498,250 @@ static void httpHandleReq(SHttpMsg* msg) { } } - int32_t len = 2048; - char* header = taosMemoryCalloc(1, len); - int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, len, msg->flag); - if (headLen < 0) { - taosMemoryFree(header); + int32_t cap = 2048; + header = taosMemoryCalloc(1, cap); + if (header == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + + int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag); + if (headLen < 0 || headLen >= cap) { goto END; } uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); + if (wb == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); + if (cli == NULL) { + taosMemoryFree(wb); + goto END; + } + cli->conn.data = cli; cli->tcp.data = cli; cli->req.data = cli; cli->wbuf = wb; cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); + if (cli->rbuf == NULL) { + taosMemoryFree(msg->uri); + taosMemoryFree(msg); + destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, chanId); + return; + } + cli->addr = msg->server; cli->port = msg->port; cli->dest = dest; + cli->chanId = chanId; taosMemoryFree(msg->uri); taosMemoryFree(msg); - uv_tcp_init(http->loop, &cli->tcp); + int err = uv_tcp_init(http->loop, &cli->tcp); + if (err != 0) { + tError("http-report failed to init socket handle, dst:%s:%d,reason:%s chanId:%" PRId64 "", cli->addr, cli->port, + uv_strerror(err), chanId); + destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, chanId); + } // set up timeout to avoid stuck; int32_t fd = taosCreateSocketWithTimeout(5000); if (fd < 0) { - tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port); + tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 "", cli->addr, cli->port, chanId); destroyHttpClient(cli); - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); return; } int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { - tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); - taosReleaseRef(httpRefMgt, httpRef); + tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(ret), cli->addr, + cli->port, chanId); destroyHttpClient(cli); + taosReleaseRef(httpRefMgt, chanId); return; } ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); if (ret != 0) { - tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, - cli->port); + tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(ret), + cli->addr, cli->port, chanId); httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); destroyHttpClient(cli); } - taosReleaseRef(httpRefMgt, httpRef); + taosReleaseRef(httpRefMgt, chanId); return; END: if (ignore == false) { - tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); + tError("http-report failed to report, reason: %s, addr: %s:%d, chanId:%" PRId64 "", terrstr(), msg->server, + msg->port, chanId); } httpDestroyMsg(msg); - taosReleaseRef(httpRefMgt, httpRef); + taosMemoryFree(header); + taosReleaseRef(httpRefMgt, chanId); +} + +static void httpModuleDestroy(SHttpModule* http) { + if (http->asyncPool != NULL) { + TRANS_DESTROY_ASYNC_POOL_MSG(http->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL); + transAsyncPoolDestroy(http->asyncPool); + } + if (http->loop) { + uv_loop_close(http->loop); + taosMemoryFree(http->loop); + } + taosHashCleanup(http->connStatusTable); + // not free http, http freeed by ref +} + +static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, + int32_t contLen, EHttpCompFlag flag, int64_t chanId) { + int32_t ret = 0; + terrno = 0; + SHttpMsg* msg = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId); + if (msg == NULL) { + return terrno; + } + + SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); + if (load == NULL) { + httpDestroyMsg(msg); + return terrno; + } + + if (atomic_load_8(&load->quit)) { + httpDestroyMsg(msg); + taosReleaseRef(httpRefMgt, chanId); + terrno = TSDB_CODE_HTTP_MODULE_QUIT; + return terrno; + } + + ret = transAsyncSend(load->asyncPool, &(msg->q)); + if (ret < 0) { + httpDestroyMsg(msg); + taosReleaseRef(httpRefMgt, chanId); + terrno = TSDB_CODE_HTTP_MODULE_QUIT; + return terrno; + } + + taosReleaseRef(httpRefMgt, chanId); + return ret; +} + +int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId) { + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, chanId); } int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { taosThreadOnce(&transHttpInit, transHttpEnvInit); - return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag); + return taosSendHttpReportImplByChan(server, uri, port, pCont, contLen, flag, httpDefaultChanId); } static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } + +int64_t transInitHttpChanImpl(); + static void transHttpEnvInit() { - httpRefMgt = taosOpenRef(1, transHttpDestroyHandle); - - SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); - http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); - http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - http->quit = 0; - - uv_loop_init(http->loop); - - http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); - if (NULL == http->asyncPool) { - taosMemoryFree(http->loop); - taosMemoryFree(http); - http = NULL; - return; - } - - int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); - if (err != 0) { - taosMemoryFree(http->loop); - taosMemoryFree(http); - http = NULL; - } - httpRef = taosAddRef(httpRefMgt, http); + httpRefMgt = taosOpenRef(64, transHttpDestroyHandle); + httpDefaultChanId = transInitHttpChanImpl(); } void transHttpEnvDestroy() { - // remove http - if (httpRef == -1) { + // remove default chanId + taosDestroyHttpChan(httpDefaultChanId); + httpDefaultChanId = -1; +} + +int64_t transInitHttpChanImpl() { + SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); + if (http == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (http->connStatusTable == NULL) { + httpModuleDestroy(http); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); + if (http->loop == NULL) { + httpModuleDestroy(http); + taosMemoryFree(http); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + int err = uv_loop_init(http->loop); + if (err != 0) { + tError("http-report failed init uv, reason:%s", uv_strerror(err)); + httpModuleDestroy(http); + taosMemoryFree(http); + return TSDB_CODE_THIRDPARTY_ERROR; + } + + http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); + if (NULL == http->asyncPool) { + httpModuleDestroy(http); + taosMemoryFree(http); + return terrno; + } + http->quit = 0; + + err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); + if (err != 0) { + httpModuleDestroy(http); + taosMemoryFree(http); + terrno = TAOS_SYSTEM_ERROR(errno); + return terrno; + } + int64_t ref = taosAddRef(httpRefMgt, http); + if (ref < 0) { + return terrno; + } + return ref; +} +int64_t taosInitHttpChan() { + taosThreadOnce(&transHttpInit, transHttpEnvInit); + return transInitHttpChanImpl(); +} + +void taosDestroyHttpChan(int64_t chanId) { + tDebug("http-report send quit, chanId:%" PRId64 "", chanId); + + int ret = 0; + SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); + if (load == NULL) { + tError("http-report failed destroy chanId %" PRId64 "", chanId); return; } - SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); - atomic_store_8(&load->quit, 1); - httpSendQuit(); + + ret = httpSendQuit(chanId); + if (ret != 0) { + tDebug("http-report already destroyed, chanId %" PRId64 "", chanId); + taosReleaseRef(httpRefMgt, chanId); + return; + } + taosThreadJoin(load->thread, NULL); - TRANS_DESTROY_ASYNC_POOL_MSG(load->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL); - transAsyncPoolDestroy(load->asyncPool); - uv_loop_close(load->loop); - taosMemoryFree(load->loop); + httpModuleDestroy(load); - taosHashCleanup(load->connStatusTable); - - taosReleaseRef(httpRefMgt, httpRef); - taosRemoveRef(httpRefMgt, httpRef); -} + taosReleaseRef(httpRefMgt, chanId); + taosReleaseRef(httpRefMgt, chanId); +} \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e9fa58e6e3..e46703b88c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -54,6 +54,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnod TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") +TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") @@ -96,6 +97,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DISKSPACE, "No enough disk space") +TAOS_DEFINE_ERROR(TSDB_CODE_THIRDPARTY_ERROR, "third party error, please check the log") + TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STARTING, "Database is starting up") TAOS_DEFINE_ERROR(TSDB_CODE_APP_IS_STOPPING, "Database is closing down") From 3148463ab316bedd4bee2b1dc275bf433aaa97dc Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 8 Jul 2024 02:43:58 +0000 Subject: [PATCH 02/13] add http interface --- source/libs/transport/src/transComm.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 68ff5dc5e5..b9223e7b39 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -224,14 +224,28 @@ int transSetConnOption(uv_tcp_t* stream, int keepalive) { SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); + if (pool == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); + if (pool->asyncs == NULL) { + taosMemoryFree(pool); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } int i = 0, err = 0; for (i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem)); + if (item == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + break; + } item->pThrd = arg; QUEUE_INIT(&item->qmsg); taosThreadMutexInit(&item->mtx, NULL); @@ -240,6 +254,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) err = uv_async_init(loop, async, cb); if (err != 0) { tError("failed to init async, reason:%s", uv_err_name(err)); + terrno = TSDB_CODE_THIRDPARTY_ERROR; break; } } From 92281bb4a3e8dafbd1b9ec578a46496be0b94e15 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 8 Jul 2024 02:44:52 +0000 Subject: [PATCH 03/13] add http interface --- source/libs/transport/src/thttp.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 704c4b06e2..903facb208 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -691,7 +691,8 @@ int64_t transInitHttpChanImpl() { tError("http-report failed init uv, reason:%s", uv_strerror(err)); httpModuleDestroy(http); taosMemoryFree(http); - return TSDB_CODE_THIRDPARTY_ERROR; + terrno = TSDB_CODE_THIRDPARTY_ERROR; + return terrno; } http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); From 0e1677d397ff903b6b78b0080e9f8d405e47f298 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 8 Jul 2024 02:51:47 +0000 Subject: [PATCH 04/13] add http interface --- source/libs/transport/src/thttp.c | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 903facb208..50f3a8a135 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -73,7 +73,7 @@ static void transHttpEnvInit(); static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); -static int32_t httpSendQuit(int64_t chanId); +static int32_t httpSendQuit(SHttpModule* http, int64_t chanId); static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId); @@ -387,13 +387,9 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { taosReleaseRef(httpRefMgt, chanId); } -int32_t httpSendQuit(int64_t chanId) { - SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); - if (http == NULL) return terrno; - +int32_t httpSendQuit(SHttpModule* http, int64_t chanId) { SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg)); if (msg == NULL) { - taosReleaseRef(httpRefMgt, chanId); return TSDB_CODE_OUT_OF_MEMORY; } @@ -403,11 +399,9 @@ int32_t httpSendQuit(int64_t chanId) { int ret = transAsyncSend(http->asyncPool, &(msg->q)); if (ret != 0) { taosMemoryFree(msg); - taosReleaseRef(httpRefMgt, chanId); - return ret; + return TSDB_CODE_THIRDPARTY_ERROR; } - taosReleaseRef(httpRefMgt, chanId); return 0; } @@ -727,14 +721,14 @@ void taosDestroyHttpChan(int64_t chanId) { int ret = 0; SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); if (load == NULL) { - tError("http-report failed destroy chanId %" PRId64 "", chanId); + tError("http-report failed to destroy chanId %" PRId64 ", reason:%s", chanId, tstrerror(terrno)); return; } - atomic_store_8(&load->quit, 1); - ret = httpSendQuit(chanId); + atomic_store_8(&load->quit, 1); + ret = httpSendQuit(load, chanId); if (ret != 0) { - tDebug("http-report already destroyed, chanId %" PRId64 "", chanId); + tDebug("http-report already destroyed, chanId %" PRId64 ",reason:%s", chanId, tstrerror(ret)); taosReleaseRef(httpRefMgt, chanId); return; } From 392e65c2ac6f754b71b6861917e3caf6aa77f943 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 8 Jul 2024 03:22:26 +0000 Subject: [PATCH 05/13] add http interface --- source/libs/transport/src/thttp.c | 44 +++++++++++++++++-------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 50f3a8a135..c023422427 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -516,50 +516,53 @@ static void httpHandleReq(SHttpMsg* msg) { SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); if (cli == NULL) { taosMemoryFree(wb); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto END; } + cli->wbuf = wb; cli->conn.data = cli; cli->tcp.data = cli; cli->req.data = cli; - cli->wbuf = wb; + cli->addr = msg->server; + cli->port = msg->port; + cli->dest = dest; + cli->chanId = chanId; + taosMemoryFree(msg->uri); + taosMemoryFree(msg); + cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); if (cli->rbuf == NULL) { - taosMemoryFree(msg->uri); - taosMemoryFree(msg); + tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, + chanId, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); destroyHttpClient(cli); taosReleaseRef(httpRefMgt, chanId); return; } - cli->addr = msg->server; - cli->port = msg->port; - cli->dest = dest; - cli->chanId = chanId; - - taosMemoryFree(msg->uri); - taosMemoryFree(msg); - int err = uv_tcp_init(http->loop, &cli->tcp); if (err != 0) { - tError("http-report failed to init socket handle, dst:%s:%d,reason:%s chanId:%" PRId64 "", cli->addr, cli->port, - uv_strerror(err), chanId); + tError("http-report failed to init socket handle, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, + chanId, uv_strerror(err)); destroyHttpClient(cli); taosReleaseRef(httpRefMgt, chanId); + return; } // set up timeout to avoid stuck; int32_t fd = taosCreateSocketWithTimeout(5000); if (fd < 0) { - tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 "", cli->addr, cli->port, chanId); + tError("http-report failed to open socket, dst:%s:%d, chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, chanId, + tstrerror(TAOS_SYSTEM_ERROR(errno))); destroyHttpClient(cli); taosReleaseRef(httpRefMgt, chanId); return; } + int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { - tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(ret), cli->addr, - cli->port, chanId); + tError("http-report failed to open socket, reason:%s, dst:%s:%d, chanId:%" PRId64 ",reason:%s", uv_strerror(ret), + cli->addr, cli->port, chanId, uv_strerror(ret)); destroyHttpClient(cli); taosReleaseRef(httpRefMgt, chanId); return; @@ -567,8 +570,8 @@ static void httpHandleReq(SHttpMsg* msg) { ret = uv_tcp_connect(&cli->conn, &cli->tcp, (const struct sockaddr*)&cli->dest, clientConnCb); if (ret != 0) { - tError("http-report failed to connect to http-server, reason:%s, dst:%s:%d, chanId:%" PRId64 "", uv_strerror(ret), - cli->addr, cli->port, chanId); + tError("http-report failed to connect to http-server,dst:%s:%d, chanId:%" PRId64 ",reson:%s", cli->addr, cli->port, + chanId, uv_strerror(ret)); httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 0); destroyHttpClient(cli); } @@ -577,9 +580,10 @@ static void httpHandleReq(SHttpMsg* msg) { END: if (ignore == false) { - tError("http-report failed to report, reason: %s, addr: %s:%d, chanId:%" PRId64 "", terrstr(), msg->server, - msg->port, chanId); + tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId, + tstrerror(terrno)); } + terrno = 0; httpDestroyMsg(msg); taosMemoryFree(header); taosReleaseRef(httpRefMgt, chanId); From c3caa8475aeafb71da01b9b0526fd2afeaaf26a0 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 8 Jul 2024 03:25:41 +0000 Subject: [PATCH 06/13] add http interface --- source/libs/transport/src/thttp.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index c023422427..4370b3d899 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -509,7 +509,6 @@ static void httpHandleReq(SHttpMsg* msg) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto END; } - wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var @@ -520,17 +519,17 @@ static void httpHandleReq(SHttpMsg* msg) { goto END; } - cli->wbuf = wb; cli->conn.data = cli; cli->tcp.data = cli; cli->req.data = cli; - cli->addr = msg->server; - cli->port = msg->port; cli->dest = dest; cli->chanId = chanId; + cli->addr = msg->server; + cli->port = msg->port; taosMemoryFree(msg->uri); taosMemoryFree(msg); + cli->wbuf = wb; cli->rbuf = taosMemoryCalloc(1, HTTP_RECV_BUF_SIZE); if (cli->rbuf == NULL) { tError("http-report failed to alloc read buf, dst:%s:%d,chanId:%" PRId64 ", reason:%s", cli->addr, cli->port, From e02caa97575f8091f93be52258a676235737cfed Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 8 Jul 2024 05:41:38 +0000 Subject: [PATCH 07/13] add http interface --- source/libs/transport/src/thttp.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 4370b3d899..74325ae5ab 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -509,6 +509,7 @@ static void httpHandleReq(SHttpMsg* msg) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto END; } + wb[0] = uv_buf_init((char*)header, strlen(header)); // heap var wb[1] = uv_buf_init((char*)msg->cont, msg->len); // heap var From fe9aebc4fe420be791db5e62050c367456bd287c Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 8 Jul 2024 09:27:42 +0000 Subject: [PATCH 08/13] fix invalid read --- source/libs/transport/src/thttp.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 74325ae5ab..13c63db903 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -604,8 +604,7 @@ static void httpModuleDestroy(SHttpModule* http) { static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId) { - int32_t ret = 0; - terrno = 0; + int32_t ret = 0; SHttpMsg* msg = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId); if (msg == NULL) { return terrno; From b96e282aff681b01c38ce9560076bc6767375713 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 9 Jul 2024 03:28:27 +0000 Subject: [PATCH 09/13] refactor code --- include/libs/transport/thttp.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index c3f107028a..a2f6b5ac8b 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -25,12 +25,12 @@ extern "C" { typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; -int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId); - int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); + int64_t taosInitHttpChan(); +int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId); void taosDestroyHttpChan(int64_t chanId); #ifdef __cplusplus From ddd3c23ff0b720d7b1b19f973ec9e5a60c88286b Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 9 Jul 2024 09:12:02 +0000 Subject: [PATCH 10/13] refactor code --- source/libs/transport/src/thttp.c | 48 +++++++++++++++---------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 13c63db903..6f7c2aae0a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -602,37 +602,41 @@ static void httpModuleDestroy(SHttpModule* http) { // not free http, http freeed by ref } +void httpModuleDestroy2(SHttpModule* http) { + httpModuleDestroy(http); + taosMemoryFree(http); +} + static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId) { - int32_t ret = 0; - SHttpMsg* msg = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId); + int32_t ret = 0; + SHttpModule* load = NULL; + SHttpMsg* msg = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId); if (msg == NULL) { - return terrno; + goto _ERROR; } - SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); + load = taosAcquireRef(httpRefMgt, chanId); if (load == NULL) { - httpDestroyMsg(msg); - return terrno; + goto _ERROR; } if (atomic_load_8(&load->quit)) { - httpDestroyMsg(msg); - taosReleaseRef(httpRefMgt, chanId); terrno = TSDB_CODE_HTTP_MODULE_QUIT; - return terrno; + goto _ERROR; } ret = transAsyncSend(load->asyncPool, &(msg->q)); if (ret < 0) { - httpDestroyMsg(msg); - taosReleaseRef(httpRefMgt, chanId); terrno = TSDB_CODE_HTTP_MODULE_QUIT; - return terrno; + goto _ERROR; } + msg = NULL; - taosReleaseRef(httpRefMgt, chanId); - return ret; +_ERROR: + httpDestroyMsg(msg); + if (load != NULL) taosReleaseRef(httpRefMgt, chanId); + return ret = terrno; } int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, @@ -670,15 +674,14 @@ int64_t transInitHttpChanImpl() { http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (http->connStatusTable == NULL) { - httpModuleDestroy(http); + httpModuleDestroy2(http); terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (http->loop == NULL) { - httpModuleDestroy(http); - taosMemoryFree(http); + httpModuleDestroy2(http); terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -686,24 +689,21 @@ int64_t transInitHttpChanImpl() { int err = uv_loop_init(http->loop); if (err != 0) { tError("http-report failed init uv, reason:%s", uv_strerror(err)); - httpModuleDestroy(http); - taosMemoryFree(http); + httpModuleDestroy2(http); terrno = TSDB_CODE_THIRDPARTY_ERROR; return terrno; } http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); if (NULL == http->asyncPool) { - httpModuleDestroy(http); - taosMemoryFree(http); + httpModuleDestroy2(http); return terrno; } http->quit = 0; err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); if (err != 0) { - httpModuleDestroy(http); - taosMemoryFree(http); + httpModuleDestroy2(http); terrno = TAOS_SYSTEM_ERROR(errno); return terrno; } @@ -741,5 +741,5 @@ void taosDestroyHttpChan(int64_t chanId) { httpModuleDestroy(load); taosReleaseRef(httpRefMgt, chanId); - taosReleaseRef(httpRefMgt, chanId); + taosRemoveRef(httpRefMgt, chanId); } \ No newline at end of file From 1b052eefd048b1061d4842f7b03afc1424935272 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Tue, 9 Jul 2024 09:48:26 +0000 Subject: [PATCH 11/13] refactor code --- source/libs/transport/src/thttp.c | 43 +++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 6f7c2aae0a..32f3bcea36 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -37,6 +37,7 @@ typedef struct SHttpModule { SHashObj* connStatusTable; SHashObj* connPool; int8_t quit; + int16_t connNum; } SHttpModule; typedef struct SHttpMsg { @@ -272,6 +273,7 @@ static void httpMayDiscardMsg(SHttpModule* http, SAsyncItem* item) { QUEUE_PUSH(&item->qmsg, &quitMsg->q); } } + static void httpAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SHttpModule* http = item->pThrd; @@ -317,6 +319,14 @@ static FORCE_INLINE void destroyHttpClient(SHttpClient* cli) { static FORCE_INLINE void clientCloseCb(uv_handle_t* handle) { SHttpClient* cli = handle->data; + + int64_t chanId = cli->chanId; + SHttpModule* http = taosAcquireRef(httpRefMgt, cli->chanId); + if (http != NULL) { + http->connNum -= 1; + taosReleaseRef(httpRefMgt, chanId); + } + destroyHttpClient(cli); } @@ -374,6 +384,8 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { taosReleaseRef(httpRefMgt, chanId); return; } + http->connNum += 1; + httpFailFastMayUpdate(http->connStatusTable, cli->addr, cli->port, 1); status = uv_write(&cli->req, (uv_stream_t*)&cli->tcp, cli->wbuf, 2, clientSentCb); @@ -590,6 +602,8 @@ END: } static void httpModuleDestroy(SHttpModule* http) { + if (http == NULL) return; + if (http->asyncPool != NULL) { TRANS_DESTROY_ASYNC_POOL_MSG(http->asyncPool, SHttpMsg, httpDestroyMsgWrapper, NULL); transAsyncPoolDestroy(http->asyncPool); @@ -598,11 +612,13 @@ static void httpModuleDestroy(SHttpModule* http) { uv_loop_close(http->loop); taosMemoryFree(http->loop); } + taosHashCleanup(http->connStatusTable); // not free http, http freeed by ref } void httpModuleDestroy2(SHttpModule* http) { + if (http == NULL) return; httpModuleDestroy(http); taosMemoryFree(http); } @@ -669,49 +685,50 @@ int64_t transInitHttpChanImpl() { SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); if (http == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + goto _ERROR; } http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (http->connStatusTable == NULL) { - httpModuleDestroy2(http); terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + goto _ERROR; } http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (http->loop == NULL) { - httpModuleDestroy2(http); terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + goto _ERROR; } int err = uv_loop_init(http->loop); if (err != 0) { tError("http-report failed init uv, reason:%s", uv_strerror(err)); - httpModuleDestroy2(http); terrno = TSDB_CODE_THIRDPARTY_ERROR; - return terrno; + goto _ERROR; } http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); - if (NULL == http->asyncPool) { - httpModuleDestroy2(http); - return terrno; + if (http->asyncPool == NULL) { + goto _ERROR; } + http->quit = 0; err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); if (err != 0) { - httpModuleDestroy2(http); terrno = TAOS_SYSTEM_ERROR(errno); - return terrno; + goto _ERROR; } + int64_t ref = taosAddRef(httpRefMgt, http); if (ref < 0) { - return terrno; + goto _ERROR; } return ref; + +_ERROR: + httpModuleDestroy2(http); + return terrno; } int64_t taosInitHttpChan() { taosThreadOnce(&transHttpInit, transHttpEnvInit); From d5eb7202f3b933f29e373e5503b29bf6e38b67ea Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Fri, 12 Jul 2024 02:54:59 +0000 Subject: [PATCH 12/13] fix compile error --- source/libs/transport/src/thttp.c | 115 +++++++++++++++++------------- 1 file changed, 64 insertions(+), 51 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 32f3bcea36..05928804b3 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -76,8 +76,8 @@ static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); static int32_t httpSendQuit(SHttpModule* http, int64_t chanId); -static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId); +static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg); static void httpDestroyMsg(SHttpMsg* msg); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); @@ -90,35 +90,44 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, int32_t contLen, EHttpCompFlag flag, int64_t chanId); static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, - EHttpCompFlag flag) { + + EHttpCompFlag flag) { + int32_t code = 0; + int32_t len = 0; if (flag == HTTP_FLAT) { - return snprintf(pHead, headLen, + len = snprintf(pHead, headLen, "POST %s HTTP/1.1\n" "Host: %s\n" "Content-Type: application/json\n" "Content-Length: %d\n\n", uri, server, contLen); + if (len < 0 || len >= headLen) { + code = TSDB_CODE_OUT_OF_RANGE; + } } else if (flag == HTTP_GZIP) { - return snprintf(pHead, headLen, + len = snprintf(pHead, headLen, "POST %s HTTP/1.1\n" "Host: %s\n" "Content-Type: application/json\n" "Content-Encoding: gzip\n" "Content-Length: %d\n\n", uri, server, contLen); + if (len < 0 || len >= headLen) { + code = TSDB_CODE_OUT_OF_RANGE; + } } else { - terrno = TSDB_CODE_INVALID_CFG; - return terrno; + return TSDB_CODE_INVALID_PARA; } + return 0; } static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { - int32_t code = -1; + int32_t code = 0; int32_t destLen = srcLen; void* pDest = taosMemoryMalloc(destLen); if (pDest == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code= TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } @@ -127,7 +136,7 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { gzipStream.zfree = (free_func)0; gzipStream.opaque = (voidpf)0; if (deflateInit2(&gzipStream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, MAX_WBITS + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } @@ -138,13 +147,13 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { while (gzipStream.avail_in != 0 && gzipStream.total_out < (uLong)(destLen)) { if (deflate(&gzipStream, Z_FULL_FLUSH) != Z_OK) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } } if (gzipStream.avail_in != 0) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } @@ -154,18 +163,18 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { break; } if (err != Z_OK) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } } if (deflateEnd(&gzipStream) != Z_OK) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } if (gzipStream.total_out >= srcLen) { - terrno = TSDB_CODE_COMPRESS_ERROR; + code = TSDB_CODE_COMPRESS_ERROR; goto _OVER; } @@ -175,10 +184,7 @@ _OVER: if (code == 0) { memcpy(pSrc, pDest, gzipStream.total_out); code = gzipStream.total_out; - } else { - code = terrno; - } - + } taosMemoryFree(pDest); return code; } @@ -206,24 +212,24 @@ static void* httpThread(void* arg) { return NULL; } -static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId) { +static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg) { if (server == NULL || uri == NULL) { tError("http-report failed to report to invalid addr, chanId:%" PRId64 "", chanId); - terrno = TSDB_CODE_INVALID_PARA; - return NULL; + *httpMsg = NULL; + return TSDB_CODE_INVALID_PARA; } if (pCont == NULL || contLen == 0) { tError("http-report failed to report empty packet, chanId:%" PRId64 "", chanId); - terrno = TSDB_CODE_INVALID_PARA; - return NULL; + *httpMsg = NULL; + return TSDB_CODE_INVALID_PARA; } SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); if (msg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + *httpMsg = NULL; + return TSDB_CODE_OUT_OF_MEMORY; } msg->port = port; @@ -232,8 +238,8 @@ static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t por msg->cont = taosMemoryMalloc(contLen); if (msg->server == NULL || msg->uri == NULL || msg->cont == NULL) { httpDestroyMsg(msg); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + *httpMsg = NULL; + return TSDB_CODE_OUT_OF_MEMORY; } memcpy(msg->cont, pCont, contLen); @@ -241,7 +247,8 @@ static SHttpMsg* httpCreateMsg(const char* server, const char* uri, uint16_t por msg->flag = flag; msg->quit = 0; msg->chanId = chanId; - return msg; + *httpMsg = msg; + return 0; } static void httpDestroyMsg(SHttpMsg* msg) { if (msg == NULL) return; @@ -477,10 +484,11 @@ static void httpHandleReq(SHttpMsg* msg) { int64_t chanId = msg->chanId; int32_t ignore = false; char* header = NULL; - terrno = 0; + int32_t code = 0; SHttpModule* http = taosAcquireRef(httpRefMgt, chanId); if (http == NULL) { + code = terrno; goto END; } if (httpFailFastShoudIgnoreMsg(http->connStatusTable, msg->server, msg->port)) { @@ -500,6 +508,7 @@ static void httpHandleReq(SHttpMsg* msg) { msg->flag = HTTP_FLAT; } if (dstLen < 0) { + code = dstLen; goto END; } } @@ -507,18 +516,19 @@ static void httpHandleReq(SHttpMsg* msg) { int32_t cap = 2048; header = taosMemoryCalloc(1, cap); if (header == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto END; } int32_t headLen = taosBuildHttpHeader(msg->server, msg->uri, msg->len, header, cap, msg->flag); - if (headLen < 0 || headLen >= cap) { + if (headLen < 0) { + code = headLen; goto END; } uv_buf_t* wb = taosMemoryCalloc(2, sizeof(uv_buf_t)); if (wb == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto END; } @@ -528,7 +538,7 @@ static void httpHandleReq(SHttpMsg* msg) { SHttpClient* cli = taosMemoryCalloc(1, sizeof(SHttpClient)); if (cli == NULL) { taosMemoryFree(wb); - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto END; } @@ -593,9 +603,8 @@ static void httpHandleReq(SHttpMsg* msg) { END: if (ignore == false) { tError("http-report failed to report to addr: %s:%d, chanId:%" PRId64 ", reason:%s", msg->server, msg->port, chanId, - tstrerror(terrno)); + tstrerror(code)); } - terrno = 0; httpDestroyMsg(msg); taosMemoryFree(header); taosReleaseRef(httpRefMgt, chanId); @@ -625,26 +634,27 @@ void httpModuleDestroy2(SHttpModule* http) { static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId) { - int32_t ret = 0; SHttpModule* load = NULL; - SHttpMsg* msg = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId); - if (msg == NULL) { + SHttpMsg *msg = NULL; + int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId,&msg); + if (code != 0) { goto _ERROR; } load = taosAcquireRef(httpRefMgt, chanId); if (load == NULL) { + code = terrno; goto _ERROR; } if (atomic_load_8(&load->quit)) { - terrno = TSDB_CODE_HTTP_MODULE_QUIT; + code = TSDB_CODE_HTTP_MODULE_QUIT; goto _ERROR; } - ret = transAsyncSend(load->asyncPool, &(msg->q)); - if (ret < 0) { - terrno = TSDB_CODE_HTTP_MODULE_QUIT; + code = transAsyncSend(load->asyncPool, &(msg->q)); + if (code != 0) { + code = TSDB_CODE_HTTP_MODULE_QUIT; goto _ERROR; } msg = NULL; @@ -652,7 +662,7 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, _ERROR: httpDestroyMsg(msg); if (load != NULL) taosReleaseRef(httpRefMgt, chanId); - return ret = terrno; + return code; } int32_t taosSendHttpReportByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, @@ -682,33 +692,35 @@ void transHttpEnvDestroy() { } int64_t transInitHttpChanImpl() { + int32_t code = 0; SHttpModule* http = taosMemoryCalloc(1, sizeof(SHttpModule)); if (http == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } http->connStatusTable = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (http->connStatusTable == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (http->loop == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _ERROR; } int err = uv_loop_init(http->loop); if (err != 0) { tError("http-report failed init uv, reason:%s", uv_strerror(err)); - terrno = TSDB_CODE_THIRDPARTY_ERROR; + code = TSDB_CODE_THIRDPARTY_ERROR; goto _ERROR; } http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); if (http->asyncPool == NULL) { + code = terrno; goto _ERROR; } @@ -716,7 +728,7 @@ int64_t transInitHttpChanImpl() { err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); if (err != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); goto _ERROR; } @@ -728,7 +740,7 @@ int64_t transInitHttpChanImpl() { _ERROR: httpModuleDestroy2(http); - return terrno; + return code; } int64_t taosInitHttpChan() { taosThreadOnce(&transHttpInit, transHttpEnvInit); @@ -742,6 +754,7 @@ void taosDestroyHttpChan(int64_t chanId) { SHttpModule* load = taosAcquireRef(httpRefMgt, chanId); if (load == NULL) { tError("http-report failed to destroy chanId %" PRId64 ", reason:%s", chanId, tstrerror(terrno)); + ret = terrno; return; } From 0d7bd8e3af28c0480957592202355f70da199838 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Fri, 12 Jul 2024 03:01:19 +0000 Subject: [PATCH 13/13] fix compile error --- source/libs/transport/src/thttp.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 05928804b3..ba12774c18 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -116,9 +116,9 @@ static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t code = TSDB_CODE_OUT_OF_RANGE; } } else { - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; } - return 0; + return code; } static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) {