fix(rpc):use tsApplyMemoryAllowed to control memory alloc while apply msg.
This commit is contained in:
parent
0ad1d08c63
commit
a4e88660db
|
@ -34,6 +34,9 @@ extern "C" {
|
|||
#define GLOBAL_CONFIG_FILE_VERSION 1
|
||||
#define LOCAL_CONFIG_FILE_VERSION 1
|
||||
|
||||
#define RPC_MEMORY_USAGE_RATIO 0.1
|
||||
#define QUEUE_MEMORY_USAGE_RATIO 0.6
|
||||
|
||||
typedef enum {
|
||||
DND_CA_SM4 = 1,
|
||||
} EEncryptAlgor;
|
||||
|
@ -110,6 +113,7 @@ extern int32_t tsNumOfQnodeFetchThreads;
|
|||
extern int32_t tsNumOfSnodeStreamThreads;
|
||||
extern int32_t tsNumOfSnodeWriteThreads;
|
||||
extern int64_t tsQueueMemoryAllowed;
|
||||
extern int64_t tsApplyMemoryAllowed;
|
||||
extern int32_t tsRetentionSpeedLimitMB;
|
||||
|
||||
extern int32_t tsNumOfCompactThreads;
|
||||
|
|
|
@ -55,6 +55,7 @@ typedef struct {
|
|||
typedef enum {
|
||||
DEF_QITEM = 0,
|
||||
RPC_QITEM = 1,
|
||||
APPLY_QITEM = 2,
|
||||
} EQItype;
|
||||
|
||||
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);
|
||||
|
|
|
@ -14,12 +14,12 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tglobal.h"
|
||||
#include "cJSON.h"
|
||||
#include "defines.h"
|
||||
#include "os.h"
|
||||
#include "osString.h"
|
||||
#include "tconfig.h"
|
||||
#include "tglobal.h"
|
||||
#include "tgrant.h"
|
||||
#include "tjson.h"
|
||||
#include "tlog.h"
|
||||
|
@ -500,7 +500,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
|
|||
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
struct SConfig *taosGetCfg() { return tsCfg; }
|
||||
struct SConfig *taosGetCfg() {
|
||||
return tsCfg;
|
||||
}
|
||||
|
||||
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
|
||||
char *apolloUrl) {
|
||||
|
@ -818,8 +820,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
tsNumOfSnodeWriteThreads = tsNumOfCores / 4;
|
||||
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
|
||||
|
||||
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * RPC_MEMORY_USAGE_RATIO * QUEUE_MEMORY_USAGE_RATIO;
|
||||
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * QUEUE_MEMORY_USAGE_RATIO * 10LL,
|
||||
TSDB_MAX_MSG_SIZE * QUEUE_MEMORY_USAGE_RATIO * 10000LL);
|
||||
|
||||
tsApplyMemoryAllowed = tsTotalMemoryKB * 1024 * RPC_MEMORY_USAGE_RATIO * (1 - QUEUE_MEMORY_USAGE_RATIO);
|
||||
tsApplyMemoryAllowed = TRANGE(tsApplyMemoryAllowed, TSDB_MAX_MSG_SIZE * (1 - QUEUE_MEMORY_USAGE_RATIO) * 10LL,
|
||||
TSDB_MAX_MSG_SIZE * (1 - QUEUE_MEMORY_USAGE_RATIO) * 10000LL);
|
||||
|
||||
tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
|
||||
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
|
||||
|
@ -857,7 +864,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
|
||||
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
|
||||
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * RPC_MEMORY_USAGE_RATIO * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE,CFG_CATEGORY_GLOBAL));
|
||||
|
@ -1572,7 +1579,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsNumOfSnodeWriteThreads = pItem->i32;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "rpcQueueMemoryAllowed");
|
||||
tsQueueMemoryAllowed = pItem->i64;
|
||||
tsQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * QUEUE_MEMORY_USAGE_RATIO;
|
||||
tsApplyMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * (1 - QUEUE_MEMORY_USAGE_RATIO);
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "simdEnable");
|
||||
tsSIMDEnable = (bool)pItem->bval;
|
||||
|
@ -2395,6 +2403,10 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
|
|||
code = TSDB_CODE_SUCCESS;
|
||||
goto _exit;
|
||||
}
|
||||
if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
|
||||
tsQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * QUEUE_MEMORY_USAGE_RATIO;
|
||||
tsApplyMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * (1 - QUEUE_MEMORY_USAGE_RATIO);
|
||||
}
|
||||
|
||||
if (strcasecmp(name, "numOfCompactThreads") == 0) {
|
||||
#ifdef TD_ENTERPRISE
|
||||
|
@ -2500,7 +2512,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
|
|||
{"experimental", &tsExperimental},
|
||||
|
||||
{"numOfRpcSessions", &tsNumOfRpcSessions},
|
||||
{"rpcQueueMemoryAllowed", &tsQueueMemoryAllowed},
|
||||
{"shellActivityTimer", &tsShellActivityTimer},
|
||||
{"readTimeout", &tsReadTimeout},
|
||||
{"safetyCheckLevel", &tsSafetyCheckLevel},
|
||||
|
|
|
@ -181,7 +181,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
|||
req.numOfSupportVnodes = tsNumOfSupportVnodes;
|
||||
req.numOfDiskCfg = tsDiskCfgNum;
|
||||
req.memTotal = tsTotalMemoryKB * 1024;
|
||||
req.memAvail = req.memTotal - tsQueueMemoryAllowed - 16 * 1024 * 1024;
|
||||
req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
|
||||
tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
|
||||
tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
|
||||
|
||||
|
|
|
@ -323,7 +323,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM;
|
||||
EQItype itype = APPLY_QUEUE == qtype ? APPLY_QITEM : RPC_QITEM;
|
||||
SRpcMsg *pMsg;
|
||||
code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
|
||||
if (code) {
|
||||
|
|
|
@ -36,7 +36,8 @@ void Testbase::InitLog(const char* path) {
|
|||
tstrncpy(tsLogDir, path, PATH_MAX);
|
||||
|
||||
taosGetSystemInfo();
|
||||
tsQueueMemoryAllowed = tsTotalMemoryKB * 0.1;
|
||||
tsQueueMemoryAllowed = tsTotalMemoryKB * 0.06;
|
||||
tsApplyMemoryAllowed = tsTotalMemoryKB * 0.04;
|
||||
if (taosInitLog("taosdlog", 1, false) != 0) {
|
||||
printf("failed to init log file\n");
|
||||
}
|
||||
|
|
|
@ -732,7 +732,11 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
|
|||
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
|
||||
if (retry) {
|
||||
taosMsleep(10);
|
||||
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index);
|
||||
if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
|
||||
sError("vgId:%d, failed to execute fsm since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
|
||||
} else {
|
||||
sDebug("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
|
||||
}
|
||||
}
|
||||
} while (retry);
|
||||
|
||||
|
|
|
@ -14,14 +14,16 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "tqueue.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
#include "tqueue.h"
|
||||
#include "tutil.h"
|
||||
|
||||
int64_t tsQueueMemoryAllowed = 0;
|
||||
int64_t tsQueueMemoryUsed = 0;
|
||||
|
||||
int64_t tsApplyMemoryAllowed = 0;
|
||||
int64_t tsApplyMemoryUsed = 0;
|
||||
struct STaosQueue {
|
||||
STaosQnode *head;
|
||||
STaosQnode *tail;
|
||||
|
@ -148,21 +150,35 @@ int64_t taosQueueMemorySize(STaosQueue *queue) {
|
|||
}
|
||||
|
||||
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
|
||||
int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
|
||||
int64_t alloced = -1;
|
||||
|
||||
if (alloced > tsQueueMemoryAllowed) {
|
||||
alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
|
||||
if (itype == RPC_QITEM) {
|
||||
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
|
||||
tsQueueMemoryAllowed);
|
||||
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
|
||||
return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
|
||||
}
|
||||
} else if (itype == APPLY_QITEM) {
|
||||
alloced = atomic_add_fetch_64(&tsApplyMemoryUsed, size + dataSize);
|
||||
if (alloced > tsApplyMemoryAllowed) {
|
||||
uDebug("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
|
||||
tsApplyMemoryAllowed);
|
||||
(void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
|
||||
terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
*item = NULL;
|
||||
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
|
||||
if (pNode == NULL) {
|
||||
if (itype == RPC_QITEM) {
|
||||
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
|
||||
return terrno;
|
||||
} else if (itype == APPLY_QITEM) {
|
||||
(void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pNode->dataSize = dataSize;
|
||||
|
@ -178,7 +194,12 @@ void taosFreeQitem(void *pItem) {
|
|||
if (pItem == NULL) return;
|
||||
|
||||
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
|
||||
int64_t alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
|
||||
int64_t alloced = -1;
|
||||
if (pNode->itype == RPC_QITEM) {
|
||||
alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
|
||||
} else if (pNode->itype == APPLY_QITEM) {
|
||||
alloced = atomic_sub_fetch_64(&tsApplyMemoryUsed, pNode->size + pNode->dataSize);
|
||||
}
|
||||
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
|
||||
|
||||
taosMemoryFree(pNode);
|
||||
|
|
Loading…
Reference in New Issue