From 8250a513177a557d8238abf2924b4ecae606a6cd Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 13 Aug 2024 11:18:29 +0800 Subject: [PATCH 1/4] change queue limit --- include/common/tglobal.h | 2 +- source/common/src/tglobal.c | 14 +++---- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/dnode/mgmt/test/sut/src/sut.cpp | 2 +- source/util/src/terror.c | 2 +- source/util/src/tqueue.c | 40 ++++++++----------- 7 files changed, 28 insertions(+), 36 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index bb5271a45f..7ecdd2a1b7 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -86,7 +86,7 @@ extern int32_t tsNumOfQnodeQueryThreads; extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfSnodeStreamThreads; extern int32_t tsNumOfSnodeWriteThreads; -extern int64_t tsRpcQueueMemoryAllowed; +extern int64_t tsQueueMemoryAllowed; extern int32_t tsRetentionSpeedLimitMB; // sync raft diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ad069356c4..3fa4efdaaa 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -650,8 +650,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = tsNumOfCores / 4; tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); - tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; - tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.3; + tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); // clang-format off TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -681,7 +681,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE)); - TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -916,9 +916,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem = cfgGetItem(pCfg, "rpcQueueMemoryAllowed"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsRpcQueueMemoryAllowed = totalMemoryKB * 1024 * 0.1; - tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); - pItem->i64 = tsRpcQueueMemoryAllowed; + tsQueueMemoryAllowed = totalMemoryKB * 1024 * 0.3; + tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + pItem->i64 = tsQueueMemoryAllowed; pItem->stype = stype; } @@ -1315,7 +1315,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = pItem->i32; TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "rpcQueueMemoryAllowed"); - tsRpcQueueMemoryAllowed = pItem->i64; + tsQueueMemoryAllowed = pItem->i64; TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "simdEnable"); tsSIMDEnable = (bool)pItem->bval; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 374bb1a673..70f258a362 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -124,7 +124,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.numOfSupportVnodes = tsNumOfSupportVnodes; req.numOfDiskCfg = tsDiskCfgNum; req.memTotal = tsTotalMemoryKB * 1024; - req.memAvail = req.memTotal - tsRpcQueueMemoryAllowed - 16 * 1024 * 1024; + req.memAvail = req.memTotal - tsQueueMemoryAllowed - 16 * 1024 * 1024; tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN); tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 8b289a5a57..986bbc4ac8 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -218,7 +218,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { pRpc->info.wrapper = pWrapper; - EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed + EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsQueueMemoryUsed code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg); if (code) goto _OVER; diff --git a/source/dnode/mgmt/test/sut/src/sut.cpp b/source/dnode/mgmt/test/sut/src/sut.cpp index f074a015d2..25ea92a533 100644 --- a/source/dnode/mgmt/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/test/sut/src/sut.cpp @@ -36,7 +36,7 @@ void Testbase::InitLog(const char* path) { tstrncpy(tsLogDir, path, PATH_MAX); taosGetSystemInfo(); - tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 0.1; + tsQueueMemoryAllowed = tsTotalMemoryKB * 0.3; if (taosInitLog("taosdlog", 1, false) != 0) { printf("failed to init log file\n"); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 2f4414cd03..a7d817b695 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -92,7 +92,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found") TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization") TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash") TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed") -TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue") +TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in queue") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_ENCODE_ERROR, "Msg encode error") diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 87d5680aa0..4c80540bed 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -19,8 +19,8 @@ #include "tlog.h" #include "tutil.h" -int64_t tsRpcQueueMemoryAllowed = 0; -int64_t tsRpcQueueMemoryUsed = 0; +int64_t tsQueueMemoryAllowed = 0; +int64_t tsQueueMemoryUsed = 0; struct STaosQueue { STaosQnode *head; @@ -148,10 +148,20 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { } int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) { - *item = NULL; + int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize); + if (itype == RPC_QITEM) { + if (alloced > tsQueueMemoryAllowed) { + uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, + tsQueueMemoryAllowed); + (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize); + return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); + } + } + *item = NULL; STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); if (pNode == NULL) { + (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize); return terrno = TSDB_CODE_OUT_OF_MEMORY; } @@ -159,21 +169,7 @@ int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void ** pNode->size = size; pNode->itype = itype; pNode->timestamp = taosGetTimestampUs(); - - if (itype == RPC_QITEM) { - int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); - if (alloced > tsRpcQueueMemoryAllowed) { - uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, - tsRpcQueueMemoryAllowed); - (void)atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); - taosMemoryFree(pNode); - return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); - } - uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced); - } else { - uTrace("item:%p, node:%p is allocated", pNode->item, pNode); - } - + uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced); *item = pNode->item; return 0; } @@ -182,12 +178,8 @@ void taosFreeQitem(void *pItem) { if (pItem == NULL) return; STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); - if (pNode->itype == RPC_QITEM) { - int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size + pNode->dataSize); - uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced); - } else { - uTrace("item:%p, node:%p is freed", pItem, pNode); - } + int64_t alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize); + uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced); taosMemoryFree(pNode); } From 7b4f785a44ce12cf0fea8f4a2eb3231596068b55 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 13 Aug 2024 11:43:51 +0800 Subject: [PATCH 2/4] change queue limit --- source/common/src/tglobal.c | 4 ++-- source/dnode/mgmt/test/sut/src/sut.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3fa4efdaaa..2083c7b896 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -650,7 +650,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = tsNumOfCores / 4; tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); - tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.3; + tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.2; tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); // clang-format off @@ -916,7 +916,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem = cfgGetItem(pCfg, "rpcQueueMemoryAllowed"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsQueueMemoryAllowed = totalMemoryKB * 1024 * 0.3; + tsQueueMemoryAllowed = totalMemoryKB * 1024 * 0.2; tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); pItem->i64 = tsQueueMemoryAllowed; pItem->stype = stype; diff --git a/source/dnode/mgmt/test/sut/src/sut.cpp b/source/dnode/mgmt/test/sut/src/sut.cpp index 25ea92a533..15485c6f87 100644 --- a/source/dnode/mgmt/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/test/sut/src/sut.cpp @@ -36,7 +36,7 @@ void Testbase::InitLog(const char* path) { tstrncpy(tsLogDir, path, PATH_MAX); taosGetSystemInfo(); - tsQueueMemoryAllowed = tsTotalMemoryKB * 0.3; + tsQueueMemoryAllowed = tsTotalMemoryKB * 0.2; if (taosInitLog("taosdlog", 1, false) != 0) { printf("failed to init log file\n"); } From 7942a9bfd185aa6d50b9b4c13e480f0d464b6988 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 13 Aug 2024 14:11:10 +0800 Subject: [PATCH 3/4] change queue limit --- source/util/src/tqueue.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 4c80540bed..9ba88b4451 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -149,8 +149,8 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) { int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize); - if (itype == RPC_QITEM) { - if (alloced > tsQueueMemoryAllowed) { + if (alloced > tsQueueMemoryAllowed) { + if (itype == RPC_QITEM) { uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, tsQueueMemoryAllowed); (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize); From 8ecc327cbdd5752b07b463db41584870e9fe707b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 13 Aug 2024 16:58:53 +0800 Subject: [PATCH 4/4] change default paramete --- source/common/src/tglobal.c | 4 ++-- source/dnode/mgmt/test/sut/src/sut.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 2083c7b896..db3b6b5c64 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -650,7 +650,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = tsNumOfCores / 4; tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); - tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.2; + tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); // clang-format off @@ -916,7 +916,7 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem = cfgGetItem(pCfg, "rpcQueueMemoryAllowed"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsQueueMemoryAllowed = totalMemoryKB * 1024 * 0.2; + tsQueueMemoryAllowed = totalMemoryKB * 1024 * 0.1; tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); pItem->i64 = tsQueueMemoryAllowed; pItem->stype = stype; diff --git a/source/dnode/mgmt/test/sut/src/sut.cpp b/source/dnode/mgmt/test/sut/src/sut.cpp index 15485c6f87..13c8c73f44 100644 --- a/source/dnode/mgmt/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/test/sut/src/sut.cpp @@ -36,7 +36,7 @@ void Testbase::InitLog(const char* path) { tstrncpy(tsLogDir, path, PATH_MAX); taosGetSystemInfo(); - tsQueueMemoryAllowed = tsTotalMemoryKB * 0.2; + tsQueueMemoryAllowed = tsTotalMemoryKB * 0.1; if (taosInitLog("taosdlog", 1, false) != 0) { printf("failed to init log file\n"); }