From a4e88660db1b28487f85fbb1192a74e2e0af73bb Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Tue, 21 Jan 2025 11:33:08 +0800 Subject: [PATCH] fix(rpc):use tsApplyMemoryAllowed to control memory alloc while apply msg. --- include/common/tglobal.h | 4 +++ include/util/tqueue.h | 1 + source/common/src/tglobal.c | 25 +++++++++++++----- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/mgmt/test/sut/src/sut.cpp | 3 ++- source/libs/sync/src/syncPipeline.c | 6 ++++- source/util/src/tqueue.c | 29 ++++++++++++++++++--- 8 files changed, 57 insertions(+), 15 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 6beb7c8860..4e9a9bd801 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -34,6 +34,9 @@ extern "C" { #define GLOBAL_CONFIG_FILE_VERSION 1 #define LOCAL_CONFIG_FILE_VERSION 1 +#define RPC_MEMORY_USAGE_RATIO 0.1 +#define QUEUE_MEMORY_USAGE_RATIO 0.6 + typedef enum { DND_CA_SM4 = 1, } EEncryptAlgor; @@ -110,6 +113,7 @@ extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfSnodeStreamThreads; extern int32_t tsNumOfSnodeWriteThreads; extern int64_t tsQueueMemoryAllowed; +extern int64_t tsApplyMemoryAllowed; extern int32_t tsRetentionSpeedLimitMB; extern int32_t tsNumOfCompactThreads; diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 5ae642b69f..1d634ce742 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -55,6 +55,7 @@ typedef struct { typedef enum { DEF_QITEM = 0, RPC_QITEM = 1, + APPLY_QITEM = 2, } EQItype; typedef void (*FItem)(SQueueInfo *pInfo, void *pItem); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 1e4d3f8c3c..5343e2de97 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -14,12 +14,12 @@ */ #define _DEFAULT_SOURCE -#include "tglobal.h" #include "cJSON.h" #include "defines.h" #include "os.h" #include "osString.h" #include "tconfig.h" +#include "tglobal.h" #include "tgrant.h" #include "tjson.h" #include "tlog.h" @@ -500,7 +500,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) { TAOS_RETURN(TSDB_CODE_SUCCESS); } -struct SConfig *taosGetCfg() { return tsCfg; } +struct SConfig *taosGetCfg() { + return tsCfg; +} static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, char *apolloUrl) { @@ -818,8 +820,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = tsNumOfCores / 4; tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); - tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; - tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); + tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * RPC_MEMORY_USAGE_RATIO * QUEUE_MEMORY_USAGE_RATIO; + tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * QUEUE_MEMORY_USAGE_RATIO * 10LL, + TSDB_MAX_MSG_SIZE * QUEUE_MEMORY_USAGE_RATIO * 10000LL); + + tsApplyMemoryAllowed = tsTotalMemoryKB * 1024 * RPC_MEMORY_USAGE_RATIO * (1 - QUEUE_MEMORY_USAGE_RATIO); + tsApplyMemoryAllowed = TRANGE(tsApplyMemoryAllowed, TSDB_MAX_MSG_SIZE * (1 - QUEUE_MEMORY_USAGE_RATIO) * 10LL, + TSDB_MAX_MSG_SIZE * (1 - QUEUE_MEMORY_USAGE_RATIO) * 10000LL); tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); @@ -857,7 +864,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL)); - TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); + TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * RPC_MEMORY_USAGE_RATIO * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL)); @@ -1572,7 +1579,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfSnodeWriteThreads = pItem->i32; TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "rpcQueueMemoryAllowed"); - tsQueueMemoryAllowed = pItem->i64; + tsQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * QUEUE_MEMORY_USAGE_RATIO; + tsApplyMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * (1 - QUEUE_MEMORY_USAGE_RATIO); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "simdEnable"); tsSIMDEnable = (bool)pItem->bval; @@ -2395,6 +2403,10 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { code = TSDB_CODE_SUCCESS; goto _exit; } + if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) { + tsQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * QUEUE_MEMORY_USAGE_RATIO; + tsApplyMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * (1 - QUEUE_MEMORY_USAGE_RATIO); + } if (strcasecmp(name, "numOfCompactThreads") == 0) { #ifdef TD_ENTERPRISE @@ -2500,7 +2512,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"experimental", &tsExperimental}, {"numOfRpcSessions", &tsNumOfRpcSessions}, - {"rpcQueueMemoryAllowed", &tsQueueMemoryAllowed}, {"shellActivityTimer", &tsShellActivityTimer}, {"readTimeout", &tsReadTimeout}, {"safetyCheckLevel", &tsSafetyCheckLevel}, diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 9ed4ee83c4..637713d2f9 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -181,7 +181,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.numOfSupportVnodes = tsNumOfSupportVnodes; req.numOfDiskCfg = tsDiskCfgNum; req.memTotal = tsTotalMemoryKB * 1024; - req.memAvail = req.memTotal - tsQueueMemoryAllowed - 16 * 1024 * 1024; + req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024; tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN); tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index c22adec9b4..334c213945 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -323,7 +323,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { return TSDB_CODE_INVALID_MSG; } - EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM; + EQItype itype = APPLY_QUEUE == qtype ? APPLY_QITEM : RPC_QITEM; SRpcMsg *pMsg; code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg); if (code) { diff --git a/source/dnode/mgmt/test/sut/src/sut.cpp b/source/dnode/mgmt/test/sut/src/sut.cpp index 13c8c73f44..a1fdebb636 100644 --- a/source/dnode/mgmt/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/test/sut/src/sut.cpp @@ -36,7 +36,8 @@ void Testbase::InitLog(const char* path) { tstrncpy(tsLogDir, path, PATH_MAX); taosGetSystemInfo(); - tsQueueMemoryAllowed = tsTotalMemoryKB * 0.1; + tsQueueMemoryAllowed = tsTotalMemoryKB * 0.06; + tsApplyMemoryAllowed = tsTotalMemoryKB * 0.04; if (taosInitLog("taosdlog", 1, false) != 0) { printf("failed to init log file\n"); } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 3022a1f8ac..18252db9ee 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -732,7 +732,11 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry); if (retry) { taosMsleep(10); - sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index); + if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) { + sError("vgId:%d, failed to execute fsm since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index); + } else { + sDebug("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index); + } } } while (retry); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index f531d9ad61..fd55851cc6 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -14,14 +14,16 @@ */ #define _DEFAULT_SOURCE -#include "tqueue.h" #include "taoserror.h" #include "tlog.h" +#include "tqueue.h" #include "tutil.h" int64_t tsQueueMemoryAllowed = 0; int64_t tsQueueMemoryUsed = 0; +int64_t tsApplyMemoryAllowed = 0; +int64_t tsApplyMemoryUsed = 0; struct STaosQueue { STaosQnode *head; STaosQnode *tail; @@ -148,21 +150,35 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { } int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) { - int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize); + int64_t alloced = -1; + if (alloced > tsQueueMemoryAllowed) { + alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize); if (itype == RPC_QITEM) { uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, tsQueueMemoryAllowed); (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize); return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); } + } else if (itype == APPLY_QITEM) { + alloced = atomic_add_fetch_64(&tsApplyMemoryUsed, size + dataSize); + if (alloced > tsApplyMemoryAllowed) { + uDebug("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, + tsApplyMemoryAllowed); + (void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize); + terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; + return NULL; + } } *item = NULL; STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); - if (pNode == NULL) { + if (itype == RPC_QITEM) { (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize); return terrno; + } else if (itype == APPLY_QITEM) { + (void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize); + return terrno; } pNode->dataSize = dataSize; @@ -178,7 +194,12 @@ void taosFreeQitem(void *pItem) { if (pItem == NULL) return; STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); - int64_t alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize); + int64_t alloced = -1; + if (pNode->itype == RPC_QITEM) { + alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize); + } else if (pNode->itype == APPLY_QITEM) { + alloced = atomic_sub_fetch_64(&tsApplyMemoryUsed, pNode->size + pNode->dataSize); + } uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced); taosMemoryFree(pNode);