From 0993813213efe3e18a0fcb5ab26538b811451798 Mon Sep 17 00:00:00 2001 From: xleili Date: Thu, 23 Feb 2023 09:55:54 +0800 Subject: [PATCH 01/42] build: release ver-3.0.2.6 --- cmake/cmake.version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cmake.version b/cmake/cmake.version index a30618157b..d0d455c73d 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.2.5") + SET(TD_VER_NUMBER "3.0.2.6") ENDIF () IF (DEFINED VERCOMPATIBLE) From 2013ba0d8f9b97b4f304925cc4bed0c8276db309 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Feb 2023 17:54:51 +0800 Subject: [PATCH 02/42] fix: fix asan problem --- source/dnode/mgmt/node_util/inc/dmUtil.h | 12 ++++----- source/dnode/mnode/impl/inc/mndInt.h | 14 +++++----- source/dnode/vnode/src/meta/metaTable.c | 4 +-- source/libs/index/src/indexFstFile.c | 18 +++++++++---- source/libs/transport/src/transCli.c | 34 +++++++++++++++--------- 5 files changed, 50 insertions(+), 32 deletions(-) diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index c2f403dfbb..6fec5b4551 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -53,12 +53,12 @@ extern "C" { #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} -#define dGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);} -#define dGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);} -#define dGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn (param ", gtid:%s", __VA_ARGS__, buf);} -#define dGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo (param ", gtid:%s", __VA_ARGS__, buf);} -#define dGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);} -#define dGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);} +#define dGFatal(param, ...) {if (dDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGError(param, ...) {if (dDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGInfo(param, ...) {if (dDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGDebug(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}} // clang-format on diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 785ecc2bf5..880c1c063d 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -41,12 +41,12 @@ extern "C" { #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} -#define mGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);} -#define mGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);} -#define mGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);} -#define mGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);} -#define mGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);} -#define mGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);} +#define mGFatal(param, ...) { if (mDebugFlag & DEBUG_FATAL){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGError(param, ...) { if (mDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGWarn(param, ...) { if (mDebugFlag & DEBUG_WARN){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGInfo(param, ...) { if (mDebugFlag & DEBUG_INFO){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGDebug(param, ...) { if (mDebugFlag & DEBUG_DEBUG){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);}} +#define mGTrace(param, ...) { if (mDebugFlag & DEBUG_TRACE){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);}} // clang-format on #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) @@ -80,7 +80,7 @@ typedef struct { typedef struct { TdThreadMutex lock; - char email[TSDB_FQDN_LEN]; + char email[TSDB_FQDN_LEN]; } STelemMgmt; typedef struct { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 94aa464354..3325f4055c 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -1921,10 +1921,10 @@ int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_ // refactor if (IS_VAR_DATA_TYPE(type)) { memcpy((*ppTagIdxKey)->data, (uint16_t *)&nTagData, VARSTR_HEADER_SIZE); - memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData); + if (pTagData != NULL) memcpy((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE, pTagData, nTagData); *(tb_uid_t *)((*ppTagIdxKey)->data + VARSTR_HEADER_SIZE + nTagData) = uid; } else { - memcpy((*ppTagIdxKey)->data, pTagData, nTagData); + if (pTagData != NULL) memcpy((*ppTagIdxKey)->data, pTagData, nTagData); *(tb_uid_t *)((*ppTagIdxKey)->data + nTagData) = uid; } diff --git a/source/libs/index/src/indexFstFile.c b/source/libs/index/src/indexFstFile.c index 5a9c8dfe3d..40c50ed9cb 100644 --- a/source/libs/index/src/indexFstFile.c +++ b/source/libs/index/src/indexFstFile.c @@ -18,6 +18,7 @@ #include "indexInt.h" #include "indexUtil.h" #include "os.h" +#include "osDef.h" #include "tutil.h" static int32_t kBlockSize = 4096; @@ -172,7 +173,8 @@ static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) { int32_t nw = taosWriteFile(ctx->file.pFile, ctx->file.wBuf, ctx->file.wBufOffset); ctx->file.wBufOffset = 0; } - taosFsyncFile(ctx->file.pFile); + int ret = taosFsyncFile(ctx->file.pFile); + UNUSED(ret); } else { // do nothing } @@ -180,11 +182,11 @@ static FORCE_INLINE int idxFileCtxDoFlush(IFileCtx* ctx) { } IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity) { + int code = 0; IFileCtx* ctx = taosMemoryCalloc(1, sizeof(IFileCtx)); if (ctx == NULL) { return NULL; } - ctx->type = type; if (ctx->type == TFILE) { // ugly code, refactor later @@ -192,15 +194,21 @@ IFileCtx* idxFileCtxCreate(WriterType type, const char* path, bool readOnly, int memcpy(ctx->file.buf, path, strlen(path)); if (readOnly == false) { ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - taosFtruncateFile(ctx->file.pFile, 0); - taosStatFile(path, &ctx->file.size, NULL); + + code = taosFtruncateFile(ctx->file.pFile, 0); + UNUSED(code); + + code = taosStatFile(path, &ctx->file.size, NULL); + UNUSED(code); ctx->file.wBufOffset = 0; ctx->file.wBufCap = kBlockSize * 4; ctx->file.wBuf = taosMemoryCalloc(1, ctx->file.wBufCap); } else { ctx->file.pFile = taosOpenFile(path, TD_FILE_READ); - taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL); + code = taosFStatFile(ctx->file.pFile, &ctx->file.size, NULL); + UNUSED(code); + ctx->file.wBufOffset = 0; #ifdef USE_MMAP diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7e1aeafaad..90daf296de 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -12,7 +12,9 @@ * along with this program. If not, see . */ +// #include "osMemory.h" #include "transComm.h" +#include "tutil.h" typedef struct SConnList { queue conns; @@ -224,9 +226,14 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } while (0); // snprintf may cause performance problem -#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ - do { \ - snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ +#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ + do { \ + char* t = key; \ + int16_t len = strlen(ip); \ + memcpy(t, ip, len); \ + t += len; \ + t[len] = ':'; \ + titoa(port, 10, &t[len + 1]); \ } while (0) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) @@ -330,12 +337,8 @@ bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { SCliMsg* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); - if (pCliMsg == NULL) - return false; - else { - cliSend(conn); - return true; - } + cliSend(conn); + return true; } return false; _RETURN: @@ -359,6 +362,7 @@ void cliHandleResp(SCliConn* conn) { int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead); if (msgLen <= 0) { + taosMemoryFree(pHead); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); return; } @@ -1705,17 +1709,23 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < cli->numOfThreads; i++) { SCliThrd* pThrd = createThrdObj(shandle); if (pThrd == NULL) { - return NULL; + goto _err; } int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); - if (err == 0) { + if (err != 0) { + goto _err; + } else { tDebug("success to create tranport-cli thread:%d", i); } cli->pThreadObj[i] = pThrd; } - return cli; + +_err: + taosMemoryFree(cli->pThreadObj); + taosMemoryFree(cli); + return NULL; } static FORCE_INLINE void destroyUserdata(STransMsg* userdata) { From 2e25f7e90de8c99b2ab6457176729c4531d1461b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Feb 2023 18:07:48 +0800 Subject: [PATCH 03/42] fix: fix asan problem --- source/dnode/mgmt/node_util/inc/dmUtil.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 6fec5b4551..55ee6d6973 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -58,7 +58,7 @@ extern "C" { #define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ", gtid:%s", __VA_ARGS__, buf);}} #define dGInfo(param, ...) {if (dDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo(param ", gtid:%s", __VA_ARGS__, buf);}} #define dGDebug(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);}} -#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}} +#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);}} // clang-format on From de4a0047c5e43fc24b593edf229d83806b64e4b9 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 23 Feb 2023 18:00:38 +0800 Subject: [PATCH 04/42] fix coverity issue CID:399957 --- include/libs/nodes/querynodes.h | 8 ++++---- include/util/tjson.h | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 2 +- source/libs/nodes/src/nodesMsgFuncs.c | 4 ++-- source/util/src/tjson.c | 9 ++++++++- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 1a9700907e..20b8bf5c46 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -38,10 +38,10 @@ typedef struct SRawExprNode { } SRawExprNode; typedef struct SDataType { - uint8_t type; - uint8_t precision; - uint8_t scale; - int32_t bytes; + uint16_t type; + uint8_t precision; + uint8_t scale; + int32_t bytes; } SDataType; typedef struct SExprNode { diff --git a/include/util/tjson.h b/include/util/tjson.h index 6922930c13..24bdc31bad 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -76,6 +76,7 @@ int32_t tjsonGetSmallIntValue(const SJson* pJson, const char* pName, int16_t* pV int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal); int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal); int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal); +int32_t tjsonGetUSmallIntValue(const SJson* pJson, const char* pName, uint16_t* pVal); int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal); int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal); int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 099cd0d3b3..2051da1181 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2972,7 +2972,7 @@ static int32_t dataTypeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToDataType(const SJson* pJson, void* pObj) { SDataType* pNode = (SDataType*)pObj; - int32_t code = tjsonGetUTinyIntValue(pJson, jkDataTypeType, &pNode->type); + int32_t code = tjsonGetUSmallIntValue(pJson, jkDataTypeType, &pNode->type); if (TSDB_CODE_SUCCESS == code) { code = tjsonGetUTinyIntValue(pJson, jkDataTypePrecision, &pNode->precision); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index ad80508c64..fadce124af 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -619,7 +619,7 @@ static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t msgToDataTypeInline(STlvDecoder* pDecoder, void* pObj) { SDataType* pNode = (SDataType*)pObj; - int32_t code = tlvDecodeValueI8(pDecoder, &pNode->type); + int32_t code = tlvDecodeValueU16(pDecoder, &pNode->type); if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueU8(pDecoder, &pNode->precision); } @@ -641,7 +641,7 @@ static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) { tlvForEach(pDecoder, pTlv, code) { switch (pTlv->type) { case DATA_TYPE_CODE_TYPE: - code = tlvDecodeI8(pTlv, &pNode->type); + code = tlvDecodeU16(pTlv, &pNode->type); break; case DATA_TYPE_CODE_PRECISION: code = tlvDecodeU8(pTlv, &pNode->precision); diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 27d14d05b1..6741c3038f 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -250,6 +250,13 @@ int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal) return code; } +int32_t tjsonGetUSmallIntValue(const SJson* pJson, const char* pName, uint16_t* pVal) { + uint64_t val = 0; + int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); + *pVal = val; + return code; +} + int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) { uint64_t val = 0; int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); @@ -375,4 +382,4 @@ bool tjsonValidateJson(const char* jIn) { return true; } -const char* tjsonGetError() { return cJSON_GetErrorPtr(); } \ No newline at end of file +const char* tjsonGetError() { return cJSON_GetErrorPtr(); } From f294d60681ae186f1511ba1013f47ad9c42c05c5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 23 Feb 2023 20:40:29 +0800 Subject: [PATCH 05/42] fix: fix asan problem --- include/libs/transport/thttp.h | 4 +- source/libs/transport/src/thttp.c | 71 ++++++++++++++++------------ source/libs/transport/src/transCli.c | 11 ++--- source/libs/transport/src/transSvr.c | 2 + 4 files changed, 50 insertions(+), 38 deletions(-) diff --git a/include/libs/transport/thttp.h b/include/libs/transport/thttp.h index 9a6aee4187..f6f1f7f027 100644 --- a/include/libs/transport/thttp.h +++ b/include/libs/transport/thttp.h @@ -17,6 +17,7 @@ #define _TD_UTIL_HTTP_H_ #include "os.h" +#include "tref.h" #ifdef __cplusplus extern "C" { @@ -24,7 +25,8 @@ extern "C" { typedef enum { HTTP_GZIP, HTTP_FLAT } EHttpCompFlag; -int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag); +int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag); #ifdef __cplusplus } diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 8e5f79137f..9ad50c1466 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -26,6 +26,8 @@ #define HTTP_RECV_BUF_SIZE 1024 +static int32_t httpRefMgt = 0; +static int64_t httpRef = -1; typedef struct SHttpModule { uv_loop_t* loop; SAsyncPool* asyncPool; @@ -41,7 +43,6 @@ typedef struct SHttpMsg { int32_t len; EHttpCompFlag flag; int8_t quit; - SHttpModule* http; } SHttpMsg; @@ -57,7 +58,6 @@ typedef struct SHttpClient { } SHttpClient; static TdThreadOnce transHttpInit = PTHREAD_ONCE_INIT; -static SHttpModule* thttp = NULL; static void transHttpEnvInit(); static void httpHandleReq(SHttpMsg* msg); @@ -280,26 +280,28 @@ static void clientConnCb(uv_connect_t* req, int32_t status) { } int32_t httpSendQuit() { + SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + if (http == NULL) return 0; + SHttpMsg* msg = taosMemoryCalloc(1, sizeof(SHttpMsg)); msg->quit = 1; - SHttpModule* load = atomic_load_ptr(&thttp); - if (load == NULL) { - httpDestroyMsg(msg); - tError("http-report already released"); - return -1; - } else { - msg->http = load; - } - transAsyncSend(load->asyncPool, &(msg->q)); + transAsyncSend(http->asyncPool, &(msg->q)); + taosReleaseRef(httpRefMgt, httpRef); return 0; } static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { + SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); + if (load == NULL) { + tError("http-report already released"); + return -1; + } + SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); msg->server = strdup(server); - msg->uri = strdup(uri); + msg->uri = strdup(uri); msg->port = port; msg->cont = taosMemoryMalloc(contLen); memcpy(msg->cont, pCont, contLen); @@ -307,15 +309,9 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1 msg->flag = flag; msg->quit = 0; - SHttpModule* load = atomic_load_ptr(&thttp); - if (load == NULL) { - httpDestroyMsg(msg); - tError("http-report already released"); - return -1; - } - - msg->http = load; - return transAsyncSend(load->asyncPool, &(msg->q)); + int ret = transAsyncSend(load->asyncPool, &(msg->q)); + taosReleaseRef(httpRefMgt, httpRef); + return ret; } static void httpDestroyClientCb(uv_handle_t* handle) { @@ -335,13 +331,19 @@ static void httpWalkCb(uv_handle_t* handle, void* arg) { return; } static void httpHandleQuit(SHttpMsg* msg) { - SHttpModule* http = msg->http; taosMemoryFree(msg); + SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + if (http == NULL) return; + uv_walk(http->loop, httpWalkCb, NULL); + taosReleaseRef(httpRefMgt, httpRef); } static void httpHandleReq(SHttpMsg* msg) { - SHttpModule* http = msg->http; + SHttpModule* http = taosAcquireRef(httpRefMgt, httpRef); + if (http == NULL) { + goto END; + } struct sockaddr_in dest = {0}; if (taosBuildDstAddr(msg->server, msg->port, &dest) < 0) { @@ -391,6 +393,7 @@ static void httpHandleReq(SHttpMsg* msg) { int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd); if (ret != 0) { tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port); + taosReleaseRef(httpRefMgt, httpRef); destroyHttpClient(cli); return; } @@ -401,21 +404,26 @@ static void httpHandleReq(SHttpMsg* msg) { cli->port); destroyHttpClient(cli); } + taosReleaseRef(httpRefMgt, httpRef); return; END: tError("http-report failed to report, reason: %s, addr: %s:%d", terrstr(), msg->server, msg->port); httpDestroyMsg(msg); + taosReleaseRef(httpRefMgt, httpRef); } -int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { +int32_t taosSendHttpReport(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag) { taosThreadOnce(&transHttpInit, transHttpEnvInit); return taosSendHttpReportImpl(server, uri, port, pCont, contLen, flag); } +static void transHttpDestroyHandle(void* handle) { taosMemoryFree(handle); } static void transHttpEnvInit() { - SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); + httpRefMgt = taosOpenRef(1, transHttpDestroyHandle); + SHttpModule* http = taosMemoryMalloc(sizeof(SHttpModule)); http->loop = taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_init(http->loop); @@ -426,21 +434,22 @@ static void transHttpEnvInit() { http = NULL; return; } - + int err = taosThreadCreate(&http->thread, NULL, httpThread, (void*)http); if (err != 0) { taosMemoryFree(http->loop); taosMemoryFree(http); http = NULL; } - atomic_store_ptr(&thttp, http); + httpRef = taosAddRef(httpRefMgt, http); } void transHttpEnvDestroy() { - SHttpModule* load = atomic_load_ptr(&thttp); - if (load == NULL) { + // remove http + if (httpRef == -1 || transHttpInit == PTHREAD_ONCE_INIT) { return; } + SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); httpSendQuit(); taosThreadJoin(load->thread, NULL); @@ -448,7 +457,7 @@ void transHttpEnvDestroy() { transAsyncPoolDestroy(load->asyncPool); uv_loop_close(load->loop); taosMemoryFree(load->loop); - taosMemoryFree(load); - atomic_store_ptr(&thttp, NULL); + taosReleaseRef(httpRefMgt, httpRef); + taosRemoveRef(httpRefMgt, httpRef); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 90daf296de..19ee4fe690 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -67,15 +67,13 @@ typedef struct SCliConn { SCliBatch* pBatch; - int64_t refId; - char* ip; - SDelayTask* task; - // debug and log info - char src[32]; - char dst[32]; + char* ip; + char src[32]; + char dst[32]; + int64_t refId; } SCliConn; typedef struct SCliMsg { @@ -134,6 +132,7 @@ typedef struct { int32_t threshold; int64_t interval; } SFailFastItem; + // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 04e094ae9a..822969132b 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -14,6 +14,8 @@ #include "transComm.h" +static int32_t httpRefMgt = 0; + static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static char* notify = "a"; From d34c70d172ea4e8efd58d3f52eb1769d157adaec Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 24 Feb 2023 09:57:57 +0800 Subject: [PATCH 06/42] Revert "fix coverity issue CID:399957" This reverts commit de4a0047c5e43fc24b593edf229d83806b64e4b9. --- include/libs/nodes/querynodes.h | 8 ++++---- include/util/tjson.h | 1 - source/libs/nodes/src/nodesCodeFuncs.c | 2 +- source/libs/nodes/src/nodesMsgFuncs.c | 4 ++-- source/util/src/tjson.c | 9 +-------- 5 files changed, 8 insertions(+), 16 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 20b8bf5c46..1a9700907e 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -38,10 +38,10 @@ typedef struct SRawExprNode { } SRawExprNode; typedef struct SDataType { - uint16_t type; - uint8_t precision; - uint8_t scale; - int32_t bytes; + uint8_t type; + uint8_t precision; + uint8_t scale; + int32_t bytes; } SDataType; typedef struct SExprNode { diff --git a/include/util/tjson.h b/include/util/tjson.h index 24bdc31bad..6922930c13 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -76,7 +76,6 @@ int32_t tjsonGetSmallIntValue(const SJson* pJson, const char* pName, int16_t* pV int32_t tjsonGetTinyIntValue(const SJson* pJson, const char* pName, int8_t* pVal); int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pVal); int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal); -int32_t tjsonGetUSmallIntValue(const SJson* pJson, const char* pName, uint16_t* pVal); int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal); int32_t tjsonGetBoolValue(const SJson* pJson, const char* pName, bool* pVal); int32_t tjsonGetDoubleValue(const SJson* pJson, const char* pName, double* pVal); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 2051da1181..099cd0d3b3 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2972,7 +2972,7 @@ static int32_t dataTypeToJson(const void* pObj, SJson* pJson) { static int32_t jsonToDataType(const SJson* pJson, void* pObj) { SDataType* pNode = (SDataType*)pObj; - int32_t code = tjsonGetUSmallIntValue(pJson, jkDataTypeType, &pNode->type); + int32_t code = tjsonGetUTinyIntValue(pJson, jkDataTypeType, &pNode->type); if (TSDB_CODE_SUCCESS == code) { code = tjsonGetUTinyIntValue(pJson, jkDataTypePrecision, &pNode->precision); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index fadce124af..ad80508c64 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -619,7 +619,7 @@ static int32_t dataTypeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t msgToDataTypeInline(STlvDecoder* pDecoder, void* pObj) { SDataType* pNode = (SDataType*)pObj; - int32_t code = tlvDecodeValueU16(pDecoder, &pNode->type); + int32_t code = tlvDecodeValueI8(pDecoder, &pNode->type); if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueU8(pDecoder, &pNode->precision); } @@ -641,7 +641,7 @@ static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) { tlvForEach(pDecoder, pTlv, code) { switch (pTlv->type) { case DATA_TYPE_CODE_TYPE: - code = tlvDecodeU16(pTlv, &pNode->type); + code = tlvDecodeI8(pTlv, &pNode->type); break; case DATA_TYPE_CODE_PRECISION: code = tlvDecodeU8(pTlv, &pNode->precision); diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 6741c3038f..27d14d05b1 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -250,13 +250,6 @@ int32_t tjsonGetUIntValue(const SJson* pJson, const char* pName, uint32_t* pVal) return code; } -int32_t tjsonGetUSmallIntValue(const SJson* pJson, const char* pName, uint16_t* pVal) { - uint64_t val = 0; - int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); - *pVal = val; - return code; -} - int32_t tjsonGetUTinyIntValue(const SJson* pJson, const char* pName, uint8_t* pVal) { uint64_t val = 0; int32_t code = tjsonGetUBigIntValue(pJson, pName, &val); @@ -382,4 +375,4 @@ bool tjsonValidateJson(const char* jIn) { return true; } -const char* tjsonGetError() { return cJSON_GetErrorPtr(); } +const char* tjsonGetError() { return cJSON_GetErrorPtr(); } \ No newline at end of file From 6ff76e57aee9d3d9af6f8a5b1e4e38874b8ae046 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 09:59:32 +0800 Subject: [PATCH 07/42] fix: opt trans debug info --- source/libs/transport/src/transSvr.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 822969132b..04e094ae9a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -14,8 +14,6 @@ #include "transComm.h" -static int32_t httpRefMgt = 0; - static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static char* notify = "a"; From 49fc02c52e95c8fdcc4d2271f21c6b7b21e58895 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 24 Feb 2023 10:08:53 +0800 Subject: [PATCH 08/42] fix coverity issue --- include/common/ttypes.h | 4 ---- source/common/src/tvariant.c | 33 --------------------------------- source/libs/scalar/src/filter.c | 18 +----------------- 3 files changed, 1 insertion(+), 54 deletions(-) diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 97ae151b7a..f8a85ee1b0 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -53,10 +53,6 @@ typedef struct { #define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0])) #define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v)) -// this data type is internally used only in 'in' query to hold the values -#define TSDB_DATA_TYPE_POINTER_ARRAY (1000) -#define TSDB_DATA_TYPE_VALUE_ARRAY (1001) - #define GET_TYPED_DATA(_v, _finalType, _type, _data) \ do { \ switch (_type) { \ diff --git a/source/common/src/tvariant.c b/source/common/src/tvariant.c index de225581a6..db69fe9d48 100644 --- a/source/common/src/tvariant.c +++ b/source/common/src/tvariant.c @@ -145,19 +145,6 @@ void taosVariantDestroy(SVariant *pVar) { pVar->nLen = 0; } - // NOTE: this is only for string array - if (pVar->nType == TSDB_DATA_TYPE_POINTER_ARRAY) { - size_t num = taosArrayGetSize(pVar->arr); - for (size_t i = 0; i < num; i++) { - void *p = taosArrayGetP(pVar->arr, i); - taosMemoryFree(p); - } - taosArrayDestroy(pVar->arr); - pVar->arr = NULL; - } else if (pVar->nType == TSDB_DATA_TYPE_VALUE_ARRAY) { - taosArrayDestroy(pVar->arr); - pVar->arr = NULL; - } } void taosVariantAssign(SVariant *pDst, const SVariant *pSrc) { @@ -180,28 +167,8 @@ void taosVariantAssign(SVariant *pDst, const SVariant *pSrc) { if (IS_NUMERIC_TYPE(pSrc->nType) || (pSrc->nType == TSDB_DATA_TYPE_BOOL)) { pDst->i = pSrc->i; - } else if (pSrc->nType == TSDB_DATA_TYPE_POINTER_ARRAY) { // this is only for string array - size_t num = taosArrayGetSize(pSrc->arr); - pDst->arr = taosArrayInit(num, sizeof(char *)); - for (size_t i = 0; i < num; i++) { - char *p = (char *)taosArrayGetP(pSrc->arr, i); - char *n = strdup(p); - taosArrayPush(pDst->arr, &n); - } - } else if (pSrc->nType == TSDB_DATA_TYPE_VALUE_ARRAY) { - size_t num = taosArrayGetSize(pSrc->arr); - pDst->arr = taosArrayInit(num, sizeof(int64_t)); - pDst->nLen = pSrc->nLen; - ASSERT(pSrc->nLen == num); - for (size_t i = 0; i < num; i++) { - int64_t *p = taosArrayGet(pSrc->arr, i); - taosArrayPush(pDst->arr, p); - } } - if (pDst->nType != TSDB_DATA_TYPE_POINTER_ARRAY && pDst->nType != TSDB_DATA_TYPE_VALUE_ARRAY) { - pDst->nLen = tDataTypes[pDst->nType].bytes; - } } int32_t taosVariantCompare(const SVariant *p1, const SVariant *p2) { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 25e65d2588..c0c8072d81 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1651,12 +1651,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options) SValueNode *var = (SValueNode *)field->desc; SDataType *dType = &var->node.resType; - // if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { - // qDebug("VAL%d => [type:TS][val:[%" PRIi64 "] - [%" PRId64 "]]", i, *(int64_t *)field->data, - // *(((int64_t *)field->data) + 1)); - // } else { qDebug("VAL%d => [type:%d][val:%" PRIx64 "]", i, dType->type, var->datum.i); // TODO - //} } else if (field->data) { qDebug("VAL%d => [type:NIL][val:NIL]", i); // TODO } @@ -1976,18 +1971,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) { fi->data = taosMemoryCalloc(1, bytes); } else { - if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { // TIME RANGE - /* - fi->data = taosMemoryCalloc(dType->bytes, tDataTypes[type].bytes); - for (int32_t a = 0; a < dType->bytes; ++a) { - int64_t *v = taosArrayGet(var->arr, a); - assignVal((char *)fi->data + a * tDataTypes[type].bytes, (char *)v, 0, type); - } - */ - continue; - } else { - fi->data = taosMemoryCalloc(1, sizeof(int64_t)); - } + fi->data = taosMemoryCalloc(1, sizeof(int64_t)); } if (dType->type == type) { From 5025f1a38a36aa9d37e8c92e6e533b7b4c411dff Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 10:13:28 +0800 Subject: [PATCH 09/42] fix: opt trans debug info --- source/libs/transport/src/transCli.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 19ee4fe690..c4c6931f64 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -229,8 +229,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); do { \ char* t = key; \ int16_t len = strlen(ip); \ - memcpy(t, ip, len); \ - t += len; \ + if (ip ! = NULL) memcpy(t, ip, len); \ t[len] = ':'; \ titoa(port, 10, &t[len + 1]); \ } while (0) @@ -2312,6 +2311,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); pCtx->epSet = *pEpSet; + pCtx->origEpSet = *pEpSet; pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; From b5d4709374644874641ca6892a47ce33a69f53b0 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 11:07:07 +0800 Subject: [PATCH 10/42] fix: coverity scan proble --- source/libs/transport/src/transCli.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c4c6931f64..8aebcbd576 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -229,7 +229,7 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); do { \ char* t = key; \ int16_t len = strlen(ip); \ - if (ip ! = NULL) memcpy(t, ip, len); \ + if (ip != NULL) memcpy(t, ip, len); \ t[len] = ':'; \ titoa(port, 10, &t[len + 1]); \ } while (0) From 0bbd72999599cd7af069048861ace7519b787957 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 16:33:28 +0800 Subject: [PATCH 11/42] fix: limit session num --- source/libs/transport/src/transCli.c | 158 ++++++++++++++++----------- 1 file changed, 96 insertions(+), 62 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5d6751a260..de5f9c26e0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -76,6 +76,10 @@ typedef struct SCliConn { } SCliConn; +typedef struct { + int32_t numOfConn; + queue msgQ; +} SMsgList; typedef struct SCliMsg { STransConnCtx* ctx; STransMsg msg; @@ -136,7 +140,7 @@ typedef struct { // add expire timeout and capacity limit static void* createConnPool(int size); static void* destroyConnPool(void* pool); -static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); +static SCliConn* getConnFromPool(void* pool, char* addr); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); @@ -176,7 +180,8 @@ static void cliSend(SCliConn* pConn); static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port); +static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr); +static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg); // cli util func static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); @@ -556,10 +561,7 @@ void* destroyConnPool(void* pool) { return NULL; } -static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - +static SCliConn* getConnFromPool(void* pool, char* key) { SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { SConnList list = {0}; @@ -607,6 +609,20 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->status = ConnInPool; + SMsgList** msglist = taosHashGet(thrd->connLimitCache, conn->ip, strlen(conn->ip)); + if (msglist != NULL && *msglist != NULL) { + if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) { + queue* h = QUEUE_HEAD(&(*msglist)->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); + transQueuePush(&conn->cliMsgs, pMsg); + cliSend(conn); + return; + } + } + if (conn->list == NULL) { tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip)); @@ -774,9 +790,10 @@ static void cliDestroy(uv_handle_t* handle) { conn->timer->data = NULL; conn->timer = NULL; } - int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); - int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1; - taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal)); + SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); + if (list != NULL && *list != NULL) { + (*list)->numOfConn--; + } atomic_sub_fetch_32(&pThrd->connCount, 1); @@ -1009,9 +1026,12 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; SCliBatchList* pList = pBatch->pList; - SCliConn* conn = getConnFromPool(pThrd->pool, pList->ip, pList->port); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); - if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pList->ip, pList->port)) { + SCliConn* conn = getConnFromPool(pThrd->pool, key); + + if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, key)) { tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, pBatch->batchSize); cliDestroyBatch(pBatch); @@ -1067,6 +1087,14 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { cliHandleFastFail(conn, -1); return; } + + SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); + if (list == NULL || *list == NULL) { + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + nList->numOfConn++; + QUEUE_INIT(&nList->msgQ); + taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); + } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); return; } @@ -1173,10 +1201,6 @@ void cliConnCb(uv_connect_t* req, int status) { return; } - int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip)); - int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1; - taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal)); - struct sockaddr peername, sockname; int addrlen = sizeof(peername); uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); @@ -1236,7 +1260,7 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); } -SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { +SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { STransConnCtx* pCtx = pMsg->ctx; SCliConn* conn = NULL; @@ -1250,7 +1274,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { } else { conn = exh->handle; if (conn == NULL) { - conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); + conn = getConnFromPool(pThrd->pool, addr); if (conn != NULL) specifyConnRef(conn, true, refId); } transReleaseExHandle(transGetRefMgt(), refId); @@ -1258,7 +1282,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { return conn; }; - conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); + conn = getConnFromPool(pThrd->pool, addr); if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { @@ -1316,20 +1340,30 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { return; } -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) { +static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr) { STrans* pTransInst = pThrd->pTransInst; - // STransConnCtx* pCtx = pMsg->ctx; - // char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - // int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr)); + if (list == NULL || *list == NULL) { + return 0; + } - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); + if ((*list)->numOfConn >= pTransInst->connLimitNum) { + return -1; + } + return 0; +} - int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key)); - if (val == NULL) return 0; +static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) { + STrans* pTransInst = pThrd->pTransInst; - if (*val >= pTransInst->connLimitNum) { + SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr)); + if (list == NULL || *list == NULL) { + return 0; + } + + if ((*list)->numOfConn >= pTransInst->connLimitNum) { + QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); return -1; } return 0; @@ -1337,36 +1371,22 @@ static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; + STraceId* trace = &pMsg->msg.info.traceId; cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); - STraceId* trace = &pMsg->msg.info.traceId; - char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - if (!EPSET_IS_VALID(&pCtx->epSet)) { tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); destroyCmsg(pMsg); return; } - if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - - SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); - if (item != NULL) { - int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp); - if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) { - tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, - TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse); - destroyCmsg(pMsg); - return; - } - } - } + char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char addr[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); bool ignore = false; - SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); + SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore, addr); if (ignore == true) { // persist conn already release by server STransMsg resp; @@ -1377,11 +1397,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); return; } - - if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, ip, port)) { - tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType), - tstrerror(TSDB_CODE_RPC_MAX_SESSIONS)); - destroyCmsg(pMsg); + if (conn == NULL && cliPreCheckSessionLimitForMsg(pThrd, addr, pMsg) != 0) { return; } @@ -1398,13 +1414,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - char key[TSDB_FQDN_LEN + 64] = {0}; - char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - CONN_CONSTRUCT_HASH_KEY(key, fqdn, port); - - conn->ip = strdup(key); - + conn->ip = strdup(addr); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); if (ipaddr == 0xffffffff) { uv_timer_stop(conn->timer); @@ -1453,6 +1463,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliHandleFastFail(conn, ret); return; } + + SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); + if (list == NULL || *list == NULL) { + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + nList->numOfConn++; + QUEUE_INIT(&nList->msgQ); + taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); + } + uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } tGTrace("%s conn %p ready", pTransInst->label, conn); @@ -1833,8 +1852,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, - pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK); + pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -1865,7 +1883,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->failFastCache); - taosHashCleanup(pThrd->connLimitCache); void** pIter = taosHashIterate(pThrd->batchCache, NULL); while (pIter != NULL) { @@ -1884,6 +1901,23 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); + + pIter = taosHashIterate(pThrd->connLimitCache, NULL); + while (pIter != NULL) { + SMsgList* list = (SMsgList*)(*pIter); + while (!QUEUE_IS_EMPTY(&list->msgQ)) { + queue* h = QUEUE_HEAD(&list->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + destroyCmsg(pMsg); + } + taosMemoryFree(list); + + pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter); + } + taosHashCleanup(pThrd->connLimitCache); + taosMemoryFree(pThrd); } From f9f4066f3c9600be9bf95d51b021f9c4ab131ae7 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 24 Feb 2023 17:17:55 +0800 Subject: [PATCH 12/42] fix(query): fix apply limit error for doMultiwayMerge operator if datablocks within same group --- source/libs/executor/src/sortoperator.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index dc3ab79afb..4095fd9443 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -671,7 +671,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); - if (limitReached) { + // if limit is reached within a group, do not clear limiInfo otherwise the next block + // will be processed. + if (newgroup && limitReached) { resetLimitInfoForNextGroup(&pInfo->limitInfo); } From eac27fc0fd1454aa057b8e82a61c6a6f46b13f1e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 24 Feb 2023 18:39:17 +0800 Subject: [PATCH 13/42] fix: empty ts range query issue --- source/libs/parser/src/parTranslater.c | 3 +++ tests/parallel_test/cases.task | 1 + tests/script/tsim/query/emptyTsRange.sim | 20 ++++++++++++++++++++ 3 files changed, 24 insertions(+) create mode 100644 tests/script/tsim/query/emptyTsRange.sim diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 7b6f795ecf..7e9ca9554b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3342,6 +3342,9 @@ static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) { if (TSDB_CODE_SUCCESS == code) { code = getQueryTimeRange(pCxt, pSelect->pWhere, &pSelect->timeRange); } + if (TSDB_CODE_SUCCESS == code && pSelect->timeRange.skey > pSelect->timeRange.ekey) { + pSelect->isEmptyResult = true; + } return code; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 63760d6ae4..867f16d71a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -181,6 +181,7 @@ ,,y,script,./test.sh -f tsim/query/groupby.sim ,,y,script,./test.sh -f tsim/query/event.sim ,,y,script,./test.sh -f tsim/query/forceFill.sim +,,y,script,./test.sh -f tsim/query/emptyTsRange.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/script/tsim/query/emptyTsRange.sim b/tests/script/tsim/query/emptyTsRange.sim new file mode 100644 index 0000000000..ca3daf2bbb --- /dev/null +++ b/tests/script/tsim/query/emptyTsRange.sim @@ -0,0 +1,20 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql drop database if exists db1; +sql create database if not exists db1; +sql use db1; +sql create stable sta (ts timestamp, f1 double, f2 binary(200)) tags(t1 int); +sql create table tba1 using sta tags(1); +sql insert into tba1 values ('2022-04-26 15:15:01', 1.0, "a"); +sql insert into tba1 values ('2022-04-26 15:15:02', 2.0, "b"); +sql insert into tba1 values ('2022-04-26 15:15:04', 4.0, "b"); +sql insert into tba1 values ('2022-04-26 15:15:05', 5.0, "b"); +sql select last_row(*) from sta where ts >= 1678901803783 and ts <= 1678901803783 and _c0 <= 1678901803782 interval(10d,8d) fill(linear) order by _wstart desc; +if $rows != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From dd2e9697b58f018d59669a05bc2e570b7a1d1863 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 20:39:25 +0800 Subject: [PATCH 14/42] fix: limit session num --- source/client/src/clientEnv.c | 60 ++++++++++++----------- source/client/src/clientImpl.c | 5 ++ source/common/src/tglobal.c | 28 ++++++++++- source/libs/transport/inc/transComm.h | 3 +- source/libs/transport/src/transCli.c | 70 ++++++++++++++++++++++++++- 5 files changed, 134 insertions(+), 32 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 06ef7b7c9c..6122a1d465 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -23,12 +23,12 @@ #include "scheduler.h" #include "tcache.h" #include "tglobal.h" +#include "thttp.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" #include "tsched.h" #include "ttime.h" -#include "thttp.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -80,17 +80,18 @@ static void deregisterRequest(SRequestObj *pRequest) { pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); if (pRequest->pQuery && pRequest->pQuery->pRoot) { - if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->pQuery->pRoot->type && (0 == ((SVnodeModifOpStmt*)pRequest->pQuery->pRoot)->sqlNodeType)) { + if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->pQuery->pRoot->type && + (0 == ((SVnodeModifOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) { tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 - "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", - duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, - pRequest->metric.planCostUs, pRequest->metric.execCostUs); + "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, + pRequest->metric.planCostUs, pRequest->metric.execCostUs); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 - "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", - duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, - pRequest->metric.planCostUs, pRequest->metric.execCostUs); + "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, + pRequest->metric.planCostUs, pRequest->metric.execCostUs); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); } @@ -154,6 +155,11 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + rpcInit.connLimitNum = connLimitNum; + void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); @@ -369,9 +375,9 @@ void doDestroyRequest(void *p) { } if (pRequest->syncQuery) { - if (pRequest->body.param){ - tsem_destroy(&((SSyncQueryParam*)pRequest->body.param)->sem); - } + if (pRequest->body.param) { + tsem_destroy(&((SSyncQueryParam *)pRequest->body.param)->sem); + } taosMemoryFree(pRequest->body.param); } @@ -398,20 +404,20 @@ static void *tscCrashReportThreadFp(void *param) { setThreadName("client-crashReport"); char filepath[PATH_MAX] = {0}; snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP); - char *pMsg = NULL; - int64_t msgLen = 0; + char *pMsg = NULL; + int64_t msgLen = 0; TdFilePtr pFile = NULL; - bool truncateFile = false; - int32_t sleepTime = 200; - int32_t reportPeriodNum = 3600 * 1000 / sleepTime; - int32_t loopTimes = reportPeriodNum; + bool truncateFile = false; + int32_t sleepTime = 200; + int32_t reportPeriodNum = 3600 * 1000 / sleepTime; + int32_t loopTimes = reportPeriodNum; #ifdef WINDOWS if (taosCheckCurrentInDll()) { atexit(crashReportThreadFuncUnexpectedStopped); } #endif - + while (1) { if (clientStop) break; if (loopTimes++ < reportPeriodNum) { @@ -441,12 +447,12 @@ static void *tscCrashReportThreadFp(void *param) { pMsg = NULL; continue; } - + if (pFile) { taosReleaseCrashLogFile(pFile, truncateFile); truncateFile = false; } - + taosMsleep(sleepTime); loopTimes = 0; } @@ -459,11 +465,11 @@ int32_t tscCrashReportInit() { if (!tsEnableCrashReport) { return 0; } - + TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - TdThread crashReportThread; + TdThread crashReportThread; if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) { tscError("failed to create crashReport thread since %s", strerror(errno)); return -1; @@ -488,26 +494,24 @@ void tscStopCrashReport() { } } - void tscWriteCrashInfo(int signum, void *sigInfo, void *context) { - char *pMsg = NULL; + char *pMsg = NULL; const char *flags = "UTL FATAL "; ELogLevel level = DEBUG_FATAL; int32_t dflag = 255; - int64_t msgLen= -1; + int64_t msgLen = -1; if (tsEnableCrashReport) { if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) { taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); } else { - msgLen = strlen(pMsg); + msgLen = strlen(pMsg); } } taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo); } - void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. @@ -561,7 +565,7 @@ void taos_init_imp(void) { taosThreadMutexInit(&appInfo.mutex, NULL); tscCrashReportInit(); - + tscDebug("client is initialized successfully"); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 079edcd667..c85e761c0b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2008,6 +2008,11 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de rpcInit.compressSize = tsCompressMsgSize; rpcInit.user = "_dnd"; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + rpcInit.connLimitNum = connLimitNum; + clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { tscError("failed to init server status client"); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index e3f08e912a..727663ba65 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -42,6 +42,7 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 2000; +int32_t tsTimeToGetAvailableConn = 1000; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4; @@ -326,6 +327,12 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; + tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 100000); + if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1; + + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000); + if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, 0) != 0) return -1; + tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); if (tsNumOfTaskQueueThreads >= 10) { @@ -395,6 +402,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1; + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000); + if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, 0) != 0) return -1; + tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1; @@ -515,6 +525,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(tsCfg, "timeToGetAvailableConn"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsTimeToGetAvailableConn = 1000; + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000); + pItem->i32 = tsTimeToGetAvailableConn; + pItem->stype = stype; + } + pItem = cfgGetItem(tsCfg, "numOfCommitThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsNumOfCommitThreads = numOfCores / 2; @@ -696,6 +714,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; + + tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; + + tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; return 0; } @@ -732,7 +754,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; - tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; + tsNumOfRpcSessions = cfgGetItem(pCfg, "numofrpcsessions")->i32; + tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; + tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; @@ -740,7 +764,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; - // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchTereads")->i32; tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a41cc0068c..843ca4515e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -147,7 +147,8 @@ typedef struct { int8_t epsetRetryCnt; int32_t retryCode; - int hThrdIdx; + void* task; + int hThrdIdx; } STransConnCtx; #pragma pack(push, 1) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index de5f9c26e0..128d9b8162 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -105,6 +105,7 @@ typedef struct SCliThrd { TdThreadMutex msgMtx; SDelayQueue* delayQueue; SDelayQueue* timeoutQueue; + SDelayQueue* waitConnQueue; uint64_t nextTimeout; // next timeout void* pTransInst; // @@ -614,8 +615,9 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) { queue* h = QUEUE_HEAD(&(*msglist)->msgQ); QUEUE_REMOVE(h); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + transDQCancel(thrd->waitConnQueue, pMsg->ctx->task); transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); cliSend(conn); @@ -1218,15 +1220,53 @@ void cliConnCb(uv_connect_t* req, int status) { } } +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { + STransConnCtx* pCtx = pMsg->ctx; + STrans* pTransInst = pThrd->pTransInst; + + STransMsg transMsg = {0}; + transMsg.contLen = 0; + transMsg.pCont = NULL; + transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS; + transMsg.msgType = pMsg->msg.msgType + 1; + transMsg.info.ahandle = pMsg->ctx->ahandle; + transMsg.info.traceId = pMsg->msg.info.traceId; + transMsg.info.hasEpSet = false; + if (pCtx->pSem != NULL) { + if (pCtx->pRsp == NULL) { + } else { + memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); + } + } else { + pTransInst->cfp(pTransInst->parent, &transMsg, NULL); + } + + destroyCmsg(pMsg); +} static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) { pThrd->stopMsg = pMsg; return; } + void** pIter = taosHashIterate(pThrd->connLimitCache, NULL); + while (pIter != NULL) { + SMsgList* list = (SMsgList*)(*pIter); + while (!QUEUE_IS_EMPTY(&list->msgQ)) { + queue* h = QUEUE_HEAD(&list->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); + + doNotifyApp(pMsg, pThrd); + } + pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter); + } pThrd->stopMsg = NULL; pThrd->quit = true; tDebug("cli work thread %p start to quit", pThrd); destroyCmsg(pMsg); + destroyConnPool(pThrd->pool); uv_walk(pThrd->loop, cliWalkCb, NULL); } @@ -1353,7 +1393,19 @@ static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr) { } return 0; } +static void doFreeTimeoutMsg(void* param) { + STaskArg* arg = param; + SCliMsg* pMsg = arg->param1; + SCliThrd* pThrd = arg->param2; + STrans* pTransInst = pThrd->pTransInst; + QUEUE_REMOVE(&pMsg->q); + + STraceId* trace = &pMsg->msg.info.traceId; + tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); + doNotifyApp(pMsg, pThrd); + taosMemoryFree(arg); +} static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) { STrans* pTransInst = pThrd->pTransInst; @@ -1363,6 +1415,15 @@ static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMs } if ((*list)->numOfConn >= pTransInst->connLimitNum) { + STraceId* trace = &pMsg->msg.info.traceId; + + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + + arg->param1 = pMsg; + arg->param2 = pThrd; + + pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, 200); + tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); return -1; } @@ -1846,6 +1907,8 @@ static SCliThrd* createThrdObj(void* trans) { transDQCreate(pThrd->loop, &pThrd->timeoutQueue); + transDQCreate(pThrd->loop, &pThrd->waitConnQueue); + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->pTransInst = trans; @@ -1872,6 +1935,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); transDQDestroy(pThrd->timeoutQueue, NULL); + transDQDestroy(pThrd->waitConnQueue, NULL); tDebug("thread destroy %" PRId64, pThrd->pid); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { @@ -1910,6 +1974,10 @@ static void destroyThrdObj(SCliThrd* pThrd) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + pThrd->destroyAhandleFp(pMsg->ctx->ahandle); + } destroyCmsg(pMsg); } taosMemoryFree(list); From 84706fe58680045ff4bb24ca1fa407831865dd2c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 21:20:15 +0800 Subject: [PATCH 15/42] fix: limit session num --- include/common/tglobal.h | 1 + include/libs/transport/trpc.h | 2 +- source/client/src/clientEnv.c | 1 + source/client/src/clientImpl.c | 1 + source/common/src/tglobal.c | 4 ++-- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 1 + source/libs/transport/inc/transportInt.h | 10 +++++----- source/libs/transport/src/trans.c | 2 +- source/libs/transport/src/transCli.c | 2 +- tools/shell/src/shellNettest.c | 3 ++- 10 files changed, 16 insertions(+), 11 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index e92afc2222..26bd6fa163 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -50,6 +50,7 @@ extern int32_t tsTagFilterResCacheSize; // queue & threads extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; +extern int32_t tsTimeToGetAvailableConn; extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfTaskQueueThreads; extern int32_t tsNumOfMnodeQueryThreads; diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 0cc0ab64ef..c73e5c127a 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -114,7 +114,7 @@ typedef struct SRpcInit { int32_t connLimitNum; int32_t connLimitLock; - + int32_t timeToGetConn; int8_t supportBatch; // 0: no batch, 1. batch int32_t batchSize; void *parent; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 6122a1d465..53fe2c7ff3 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -159,6 +159,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c85e761c0b..e8751e5b1d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2012,6 +2012,7 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 727663ba65..adf2c246c4 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -41,8 +41,8 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; -int32_t tsNumOfRpcSessions = 2000; -int32_t tsTimeToGetAvailableConn = 1000; +int32_t tsNumOfRpcSessions = 10000; +int32_t tsTimeToGetAvailableConn = 10000; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 3a1ca161a9..0245847b20 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -292,6 +292,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 1f3c98ad72..8ea0064d44 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -64,11 +64,11 @@ typedef struct { void (*destroyFp)(void* ahandle); bool (*failFastFp)(tmsg_t msgType); - int32_t connLimitNum; - int8_t connLimitLock; // 0: no lock. 1. lock - int8_t supportBatch; // 0: no batch, 1: support batch - int32_t batchSize; - + int32_t connLimitNum; + int8_t connLimitLock; // 0: no lock. 1. lock + int8_t supportBatch; // 0: no batch, 1: support batch + int32_t batchSize; + int32_t timeToGetConn; int index; void* parent; void* tcphandle; // returned handle from TCP initialization diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f5f3b52f50..35b48fea6b 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -90,7 +90,7 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->user) { tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); } - + pRpc->timeToGetConn = pInit->timeToGetConn; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 128d9b8162..d002366a12 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1422,7 +1422,7 @@ static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMs arg->param1 = pMsg; arg->param2 = pThrd; - pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, 200); + pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); return -1; diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index 52ce37b22c..1a6ac3489d 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -21,7 +21,7 @@ static void shellWorkAsClient() { SRpcInit rpcInit = {0}; SEpSet epSet = {.inUse = 0, .numOfEps = 1}; SRpcMsg rpcRsp = {0}; - void * clientRpc = NULL; + void *clientRpc = NULL; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass); @@ -31,6 +31,7 @@ static void shellWorkAsClient() { rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.user = "_dnd"; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { From 9b77e2dfe84a0ce14bfa4ad1a8aa1088ad62d3ea Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 22:44:03 +0800 Subject: [PATCH 16/42] fix: limit session num --- source/libs/transport/src/transCli.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d002366a12..0e8b87938c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1096,6 +1096,8 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { nList->numOfConn++; QUEUE_INIT(&nList->msgQ); taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); + } else { + (*list)->numOfConn++; } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); return; @@ -1400,7 +1402,6 @@ static void doFreeTimeoutMsg(void* param) { STrans* pTransInst = pThrd->pTransInst; QUEUE_REMOVE(&pMsg->q); - STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); doNotifyApp(pMsg, pThrd); @@ -1531,6 +1532,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { nList->numOfConn++; QUEUE_INIT(&nList->msgQ); taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); + } else { + (*list)->numOfConn++; } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); From 0712198e2122e3e9fc915ca9a4c413674effef03 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 22:44:39 +0800 Subject: [PATCH 17/42] fix: limit session num --- source/client/src/clientEnv.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 53fe2c7ff3..8a0d9e0405 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -140,7 +140,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; - rpcInit.numOfThreads = numOfThread; + rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = processMsgFromServer; rpcInit.rfp = clientRpcRfp; rpcInit.sessions = 1024; From 28db4c8a8c3cdf7f474471eab58dc2548a55344b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 23:19:46 +0800 Subject: [PATCH 18/42] fix: limit session num --- source/libs/transport/src/transCli.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0e8b87938c..76c4ff46f3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -336,12 +336,8 @@ bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { SCliMsg* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); - if (pCliMsg == NULL) - return false; - else { - cliSend(conn); - return true; - } + cliSend(conn); + return true; } return false; _RETURN: @@ -616,7 +612,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { queue* h = QUEUE_HEAD(&(*msglist)->msgQ); QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - + conn->status = ConnNormal; transDQCancel(thrd->waitConnQueue, pMsg->ctx->task); transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); From 66322311110981913ac09c2fcf341c9712b24120 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 24 Feb 2023 23:42:57 +0800 Subject: [PATCH 19/42] fix: limit session num --- source/libs/transport/src/thttp.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index f7da9cba25..04b546b36a 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -302,7 +302,7 @@ static int32_t taosSendHttpReportImpl(const char* server, const char* uri, uint1 SHttpMsg* msg = taosMemoryMalloc(sizeof(SHttpMsg)); msg->server = taosStrdup(server); - msg->uri = taosStrdup(uri); + msg->uri = taosStrdup(uri); msg->port = port; msg->cont = taosMemoryMalloc(contLen); memcpy(msg->cont, pCont, contLen); @@ -447,7 +447,7 @@ static void transHttpEnvInit() { void transHttpEnvDestroy() { // remove http - if (httpRef == -1 || transHttpInit == PTHREAD_ONCE_INIT) { + if (httpRef == -1) { return; } SHttpModule* load = taosAcquireRef(httpRefMgt, httpRef); From f933f604e466f312b262dd072a15b40c86d5897c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 25 Feb 2023 11:09:24 +0800 Subject: [PATCH 20/42] fix: undefine the strdup by default. --- include/os/osMemory.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 3f57c72933..c35fd782fb 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -29,9 +29,12 @@ extern "C" { #define calloc CALLOC_FUNC_TAOS_FORBID #define realloc REALLOC_FUNC_TAOS_FORBID #define free FREE_FUNC_TAOS_FORBID +#ifdef strdup +#undef strdup #define strdup STRDUP_FUNC_TAOS_FORBID -#endif // ifndef ALLOW_FORBID_FUNC +#endif +#endif // ifndef ALLOW_FORBID_FUNC #endif // if !defined(WINDOWS) int32_t taosMemoryDbgInit(); From 3e9cc93b9f44b0f5dc9a85930574a785280756a5 Mon Sep 17 00:00:00 2001 From: huolibo Date: Sat, 25 Feb 2023 12:54:24 +0800 Subject: [PATCH 21/42] enh(driver): tmq async commit callback (#20114) --- .../jni/com_taosdata_jdbc_tmq_TMQConnector.h | 3 + source/client/src/clientJniConnector.c | 1 + source/client/src/clientTmqConnector.c | 66 +++++++++++++++++++ 3 files changed, 70 insertions(+) diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h index 197cd78006..c035b6598c 100644 --- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h +++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h @@ -99,6 +99,9 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI */ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *, jobject, jlong, jlong, jobject); +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *, jobject, jlong, jlong, + jobject); + /* * Class: com_taosdata_jdbc_tmq_TMQConnector * Method: tmqUnsubscribeImp diff --git a/source/client/src/clientJniConnector.c b/source/client/src/clientJniConnector.c index 750ba684f4..cfa6f84bd2 100644 --- a/source/client/src/clientJniConnector.c +++ b/source/client/src/clientJniConnector.c @@ -56,6 +56,7 @@ jmethodID g_createConsumerErrorCallback; jmethodID g_topicListCallback; jclass g_consumerClass; +// deprecated jmethodID g_commitCallback; void jniGetGlobalMethod(JNIEnv *env) { diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c index ccfc4980bc..a8c9f2279d 100644 --- a/source/client/src/clientTmqConnector.c +++ b/source/client/src/clientTmqConnector.c @@ -17,6 +17,36 @@ #include "jniCommon.h" #include "taos.h" +int __init_tmq = 0; +jmethodID g_offsetCallback; + +void tmqGlobalMethod(JNIEnv *env) { + // make sure init function executed once + switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) { + case 0: + break; + case 1: + do { + taosMsleep(0); + } while (atomic_load_32(&__init_tmq) == 1); + case 2: + return; + } + + if (g_vm == NULL) { + (*env)->GetJavaVM(env, &g_vm); + } + + jclass offset = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/OffsetWaitCallback"); + jclass g_offsetCallbackClass = (*env)->NewGlobalRef(env, offset); + g_offsetCallback = (*env)->GetMethodID(env, g_offsetCallbackClass, "commitCallbackHandler", "(I)V"); + (*env)->DeleteLocalRef(env, offset); + + atomic_store_32(&__init_tmq, 2); + jniDebug("tmq method register finished"); +} + +// deprecated void commit_cb(tmq_t *tmq, int32_t code, void *param) { JNIEnv *env = NULL; int status = (*g_vm)->GetEnv(g_vm, (void **)&env, JNI_VERSION_1_6); @@ -40,6 +70,28 @@ void commit_cb(tmq_t *tmq, int32_t code, void *param) { env = NULL; } +void consumer_callback(tmq_t *tmq, int32_t code, void *param) { + JNIEnv *env = NULL; + int status = (*g_vm)->GetEnv(g_vm, (void **)&env, JNI_VERSION_1_6); + bool needDetach = false; + if (status < 0) { + if ((*g_vm)->AttachCurrentThread(g_vm, (void **)&env, NULL) != 0) { + return; + } + needDetach = true; + } + + jobject obj = (jobject)param; + (*env)->CallVoidMethod(env, obj, g_offsetCallback, code); + (*env)->DeleteGlobalRef(env, obj); + param = NULL; + + if (needDetach) { + (*g_vm)->DetachCurrentThread(g_vm); + } + env = NULL; +} + JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConfNewImp(JNIEnv *env, jobject jobj) { tmq_conf_t *conf = tmq_conf_new(); jniGetGlobalMethod(env); @@ -201,6 +253,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitSync(JNI return tmq_commit_sync(tmq, res); } +// deprecated JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq, jlong jres, jobject consumer) { tmq_t *tmq = (tmq_t *)jtmq; @@ -213,6 +266,19 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN tmq_commit_async(tmq, res, commit_cb, consumer); } +JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq, + jlong jres, jobject offset) { + tmqGlobalMethod(env); + tmq_t *tmq = (tmq_t *)jtmq; + if (tmq == NULL) { + jniError("jobj:%p, tmq is closed", jobj); + return; + } + TAOS_RES *res = (TAOS_RES *)jres; + offset = (*env)->NewGlobalRef(env, offset); + tmq_commit_async(tmq, res, consumer_callback, offset); +} + JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(JNIEnv *env, jobject jobj, jlong jtmq) { tmq_t *tmq = (tmq_t *)jtmq; From 45a012bdf5b7e7ac3bf6c97a3b73dd7d0da387aa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 25 Feb 2023 13:18:53 +0800 Subject: [PATCH 22/42] fix: limit session num --- source/libs/transport/src/transCli.c | 296 ++++++++++++++------------- 1 file changed, 150 insertions(+), 146 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 76c4ff46f3..9f8637aa05 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -14,9 +14,16 @@ #include "transComm.h" +typedef struct { + int32_t numOfConn; + queue msgQ; +} SMsgList; + typedef struct SConnList { - queue conns; - int32_t size; + queue conns; + int32_t size; + SMsgList* list; + void* pThrd; } SConnList; typedef struct { @@ -76,10 +83,6 @@ typedef struct SCliConn { } SCliConn; -typedef struct { - int32_t numOfConn; - queue msgQ; -} SMsgList; typedef struct SCliMsg { STransConnCtx* ctx; STransMsg msg; @@ -115,7 +118,6 @@ typedef struct SCliThrd { SCvtAddr cvtAddr; SHashObj* failFastCache; - SHashObj* connLimitCache; SHashObj* batchCache; SCliMsg* stopMsg; @@ -141,7 +143,7 @@ typedef struct { // add expire timeout and capacity limit static void* createConnPool(int size); static void* destroyConnPool(void* pool); -static SCliConn* getConnFromPool(void* pool, char* addr); +static SCliConn* getConnFromPool(SCliThrd* thread, char* key); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); @@ -181,8 +183,8 @@ static void cliSend(SCliConn* pConn); static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr); -static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg); +static void doFreeTimeoutMsg(void* param); +static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg); // cli util func static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); @@ -200,6 +202,7 @@ static void cliHandleExcept(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); @@ -546,20 +549,38 @@ void* createConnPool(int size) { } void* destroyConnPool(void* pool) { SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); + SCliThrd* pThrd = connList->pThrd; while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conns)) { queue* h = QUEUE_HEAD(&connList->conns); SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } + + SMsgList* msglist = connList->list; + while (!QUEUE_IS_EMPTY(&msglist->msgQ)) { + queue* h = QUEUE_HEAD(&msglist->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); + pMsg->ctx->task = NULL; + + doNotifyApp(pMsg, pThrd); + } + taosMemoryFree(msglist); + connList = taosHashIterate((SHashObj*)pool, connList); } taosHashCleanup(pool); return NULL; } -static SCliConn* getConnFromPool(void* pool, char* key) { +static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) { + void* pool = pThrd->pool; SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + STrans* pTranInst = pThrd->pTransInst; if (plist == NULL) { SConnList list = {0}; taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); @@ -568,7 +589,76 @@ static SCliConn* getConnFromPool(void* pool, char* key) { QUEUE_INIT(&plist->conns); } + SMsgList* msglist = plist->list; + if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) { + return NULL; + } + + plist->size -= 1; + queue* h = QUEUE_HEAD(&plist->conns); + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); + conn->status = ConnNormal; + QUEUE_REMOVE(&conn->q); + QUEUE_INIT(&conn->q); + + if (conn->task != NULL) { + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + conn->task = NULL; + } + return conn; +} + +static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { + void* pool = pThrd->pool; + STrans* pTransInst = pThrd->pTransInst; + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + if (plist == NULL) { + SConnList list = {0}; + + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + nList->numOfConn++; + QUEUE_INIT(&nList->msgQ); + list.list = nList; + + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + + plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + QUEUE_INIT(&plist->conns); + } + + SMsgList* list = plist->list; + // no avaliable conn in pool if (QUEUE_IS_EMPTY(&plist->conns)) { + if ((list)->numOfConn >= pTransInst->connLimitNum) { + STraceId* trace = &(*pMsg)->msg.info.traceId; + + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = *pMsg; + arg->param2 = pThrd; + (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + + tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); + + QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); + *pMsg = NULL; + } else { + // send msg in delay queue + if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = *pMsg; + arg->param2 = pThrd; + (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + + QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); + queue* h = QUEUE_HEAD(&(list)->msgQ); + QUEUE_REMOVE(h); + SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q); + + *pMsg = ans; + transDQCancel(pThrd->waitConnQueue, ans->ctx->task); + } + list->numOfConn++; + } return NULL; } @@ -604,29 +694,30 @@ static void addConnToPool(void* pool, SCliConn* conn) { cliDestroyConnMsgs(conn, false); - conn->status = ConnInPool; - - SMsgList** msglist = taosHashGet(thrd->connLimitCache, conn->ip, strlen(conn->ip)); - if (msglist != NULL && *msglist != NULL) { - if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) { - queue* h = QUEUE_HEAD(&(*msglist)->msgQ); - QUEUE_REMOVE(h); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - conn->status = ConnNormal; - transDQCancel(thrd->waitConnQueue, pMsg->ctx->task); - transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); - transQueuePush(&conn->cliMsgs, pMsg); - cliSend(conn); - return; - } - } - if (conn->list == NULL) { - tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip)); - } else { - tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); } + + SConnList* pList = conn->list; + SMsgList* msgList = pList->list; + if (!QUEUE_IS_EMPTY(&msgList->msgQ)) { + queue* h = QUEUE_HEAD(&(msgList)->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + transDQCancel(thrd->waitConnQueue, pMsg->ctx->task); + pMsg->ctx->task = NULL; + + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); + transQueuePush(&conn->cliMsgs, pMsg); + + conn->status = ConnNormal; + cliSend(conn); + return; + } + + conn->status = ConnInPool; QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; @@ -752,8 +843,19 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { static void cliDestroyConn(SCliConn* conn, bool clear) { SCliThrd* pThrd = conn->hostThrd; tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); + QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); + + if (conn->list != NULL) { + SConnList* connList = conn->list; + connList->list->numOfConn--; + } else { + SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip)); + connList->list->numOfConn--; + } + conn->list = NULL; + transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; @@ -788,10 +890,6 @@ static void cliDestroy(uv_handle_t* handle) { conn->timer->data = NULL; conn->timer = NULL; } - SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); - if (list != NULL && *list != NULL) { - (*list)->numOfConn--; - } atomic_sub_fetch_32(&pThrd->connCount, 1); @@ -1027,9 +1125,9 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); - SCliConn* conn = getConnFromPool(pThrd->pool, key); + SCliConn* conn = getConnFromPool(pThrd, key); - if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, key)) { + if (conn == NULL) { tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, pBatch->batchSize); cliDestroyBatch(pBatch); @@ -1085,16 +1183,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { cliHandleFastFail(conn, -1); return; } - - SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); - if (list == NULL || *list == NULL) { - SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); - nList->numOfConn++; - QUEUE_INIT(&nList->msgQ); - taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); - } else { - (*list)->numOfConn++; - } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); return; } @@ -1246,20 +1334,6 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { pThrd->stopMsg = pMsg; return; } - void** pIter = taosHashIterate(pThrd->connLimitCache, NULL); - while (pIter != NULL) { - SMsgList* list = (SMsgList*)(*pIter); - while (!QUEUE_IS_EMPTY(&list->msgQ)) { - queue* h = QUEUE_HEAD(&list->msgQ); - QUEUE_REMOVE(h); - - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); - - doNotifyApp(pMsg, pThrd); - } - pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter); - } pThrd->stopMsg = NULL; pThrd->quit = true; tDebug("cli work thread %p start to quit", pThrd); @@ -1298,11 +1372,11 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); } -SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { - STransConnCtx* pCtx = pMsg->ctx; +SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { + STransConnCtx* pCtx = (*pMsg)->ctx; SCliConn* conn = NULL; - int64_t refId = (int64_t)(pMsg->msg.info.handle); + int64_t refId = (int64_t)((*pMsg)->msg.info.handle); if (refId != 0) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { @@ -1312,7 +1386,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { } else { conn = exh->handle; if (conn == NULL) { - conn = getConnFromPool(pThrd->pool, addr); + conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) specifyConnRef(conn, true, refId); } transReleaseExHandle(transGetRefMgt(), refId); @@ -1320,7 +1394,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { return conn; }; - conn = getConnFromPool(pThrd->pool, addr); + conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { @@ -1378,19 +1452,6 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { return; } -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr) { - STrans* pTransInst = pThrd->pTransInst; - - SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr)); - if (list == NULL || *list == NULL) { - return 0; - } - - if ((*list)->numOfConn >= pTransInst->connLimitNum) { - return -1; - } - return 0; -} static void doFreeTimeoutMsg(void* param) { STaskArg* arg = param; SCliMsg* pMsg = arg->param1; @@ -1403,48 +1464,24 @@ static void doFreeTimeoutMsg(void* param) { doNotifyApp(pMsg, pThrd); taosMemoryFree(arg); } -static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) { - STrans* pTransInst = pThrd->pTransInst; - - SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr)); - if (list == NULL || *list == NULL) { - return 0; - } - - if ((*list)->numOfConn >= pTransInst->connLimitNum) { - STraceId* trace = &pMsg->msg.info.traceId; - - STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); - - arg->param1 = pMsg; - arg->param2 = pThrd; - - pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); - tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); - QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); - return -1; - } - return 0; -} void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { - STrans* pTransInst = pThrd->pTransInst; - STransConnCtx* pCtx = pMsg->ctx; - STraceId* trace = &pMsg->msg.info.traceId; + STrans* pTransInst = pThrd->pTransInst; + STraceId* trace = &pMsg->msg.info.traceId; - cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); - if (!EPSET_IS_VALID(&pCtx->epSet)) { + cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); destroyCmsg(pMsg); return; } - char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); char addr[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); bool ignore = false; - SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore, addr); + SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr); if (ignore == true) { // persist conn already release by server STransMsg resp; @@ -1455,12 +1492,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); return; } - if (conn == NULL && cliPreCheckSessionLimitForMsg(pThrd, addr, pMsg) != 0) { + if (conn == NULL && pMsg == NULL) { return; } if (conn != NULL) { - transCtxMerge(&conn->ctx, &pCtx->appCtx); + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); cliSend(conn); } else { @@ -1469,7 +1506,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { int64_t refId = (int64_t)pMsg->msg.info.handle; if (refId != 0) specifyConnRef(conn, true, refId); - transCtxMerge(&conn->ctx, &pCtx->appCtx); + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); conn->ip = strdup(addr); @@ -1521,17 +1558,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliHandleFastFail(conn, ret); return; } - - SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); - if (list == NULL || *list == NULL) { - SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); - nList->numOfConn++; - QUEUE_INIT(&nList->msgQ); - taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); - } else { - (*list)->numOfConn++; - } - uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } tGTrace("%s conn %p ready", pTransInst->label, conn); @@ -1914,7 +1940,6 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -1964,27 +1989,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); - - pIter = taosHashIterate(pThrd->connLimitCache, NULL); - while (pIter != NULL) { - SMsgList* list = (SMsgList*)(*pIter); - while (!QUEUE_IS_EMPTY(&list->msgQ)) { - queue* h = QUEUE_HEAD(&list->msgQ); - QUEUE_REMOVE(h); - - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - - if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { - pThrd->destroyAhandleFp(pMsg->ctx->ahandle); - } - destroyCmsg(pMsg); - } - taosMemoryFree(list); - - pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter); - } - taosHashCleanup(pThrd->connLimitCache); - taosMemoryFree(pThrd); } From c7e15291f3ba7b6228c17c5d81c6f3e0485d94b9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 25 Feb 2023 13:50:27 +0800 Subject: [PATCH 23/42] fix: limit session num --- source/libs/transport/src/transCli.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9f8637aa05..564058978f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -624,6 +624,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { plist = taosHashGet((SHashObj*)pool, key, strlen(key)); QUEUE_INIT(&plist->conns); + return NULL; } SMsgList* list = plist->list; From d7dc176ebbbdcf06970b3a5bba06e7ccf9375c4f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 25 Feb 2023 14:01:52 +0800 Subject: [PATCH 24/42] fix: limit session num --- source/libs/transport/src/transCli.c | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 564058978f..393360b4e9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -582,11 +582,16 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) { SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); STrans* pTranInst = pThrd->pTransInst; if (plist == NULL) { + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + QUEUE_INIT(&nList->msgQ); + nList->numOfConn++; + SConnList list = {0}; + QUEUE_INIT(&list.conns); + list.list = nList; + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - if (plist == NULL) return NULL; - QUEUE_INIT(&plist->conns); } SMsgList* msglist = plist->list; @@ -613,17 +618,17 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { STrans* pTransInst = pThrd->pTransInst; SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { - SConnList list = {0}; - SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); - nList->numOfConn++; QUEUE_INIT(&nList->msgQ); + nList->numOfConn++; + + SConnList list = {0}; + QUEUE_INIT(&list.conns); list.list = nList; taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - QUEUE_INIT(&plist->conns); + return NULL; } From be17aa822b8ef88bfb32f252448c8a35850fa6d3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 25 Feb 2023 14:21:15 +0800 Subject: [PATCH 25/42] fix: limit session num --- source/libs/transport/src/transCli.c | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 393360b4e9..182d2b59b8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -143,7 +143,7 @@ typedef struct { // add expire timeout and capacity limit static void* createConnPool(int size); static void* destroyConnPool(void* pool); -static SCliConn* getConnFromPool(SCliThrd* thread, char* key); +static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); @@ -577,7 +577,7 @@ void* destroyConnPool(void* pool) { return NULL; } -static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) { +static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { void* pool = pThrd->pool; SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); STrans* pTranInst = pThrd->pTransInst; @@ -591,11 +591,12 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) { list.list = nList; taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + return NULL; } SMsgList* msglist = plist->list; if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) { + *exceed = true; return NULL; } @@ -1131,11 +1132,12 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); - SCliConn* conn = getConnFromPool(pThrd, key); + bool exceed = false; + SCliConn* conn = getConnFromPool(pThrd, key, &exceed); - if (conn == NULL) { - tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, - pBatch->batchSize); + if (conn == NULL && exceed) { + tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen, + pBatch->batchSize, pTransInst->connLimitNum); cliDestroyBatch(pBatch); return; } @@ -1471,12 +1473,10 @@ static void doFreeTimeoutMsg(void* param) { taosMemoryFree(arg); } void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { - STrans* pTransInst = pThrd->pTransInst; - STraceId* trace = &pMsg->msg.info.traceId; + STrans* pTransInst = pThrd->pTransInst; cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { - tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); destroyCmsg(pMsg); return; } @@ -1501,6 +1501,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { if (conn == NULL && pMsg == NULL) { return; } + STraceId* trace = &pMsg->msg.info.traceId; if (conn != NULL) { transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); From 0e613ea3f381d3a3c349873b02b5e0f9a1857ec2 Mon Sep 17 00:00:00 2001 From: huolibo Date: Sat, 25 Feb 2023 14:48:25 +0800 Subject: [PATCH 26/42] enh(driver): add spring + mybatis type:byte[] example (#20050) * enh(driver): add spring + mybatis type:byte[] example * doc: add init description * docs: add byte[] description --- examples/JDBC/springbootdemo/readme.md | 7 +++++ .../springbootdemo/dao/WeatherMapper.xml | 10 +++++-- .../springbootdemo/domain/Weather.java | 30 +++++++++++++++++++ .../service/WeatherService.java | 2 ++ 4 files changed, 47 insertions(+), 2 deletions(-) diff --git a/examples/JDBC/springbootdemo/readme.md b/examples/JDBC/springbootdemo/readme.md index a3942a6a51..a89e21c009 100644 --- a/examples/JDBC/springbootdemo/readme.md +++ b/examples/JDBC/springbootdemo/readme.md @@ -1,6 +1,13 @@ ## TDengine SpringBoot + Mybatis Demo ## 需要提前创建 test 数据库 + +``` +$ taos -s 'create database if not exists test' + +$ curl http://localhost:8080/weather/init +``` + ### 配置 application.properties ```properties # datasource config diff --git a/examples/JDBC/springbootdemo/src/main/java/com/taosdata/example/springbootdemo/dao/WeatherMapper.xml b/examples/JDBC/springbootdemo/src/main/java/com/taosdata/example/springbootdemo/dao/WeatherMapper.xml index 99d5893ec1..4899ec4654 100644 --- a/examples/JDBC/springbootdemo/src/main/java/com/taosdata/example/springbootdemo/dao/WeatherMapper.xml +++ b/examples/JDBC/springbootdemo/src/main/java/com/taosdata/example/springbootdemo/dao/WeatherMapper.xml @@ -7,6 +7,7 @@ + - insert into test.t#{groupId} (ts, temperature, humidity, note) - values (#{ts}, ${temperature}, ${humidity}, #{note}) + insert into test.t#{groupId} (ts, temperature, humidity, note, bytes) + values (#{ts}, ${temperature}, ${humidity}, #{note}, #{bytes})