Merge pull request #27179 from taosdata/enh/TD-31403

Enh/TD-31403
This commit is contained in:
Hongze Cheng 2024-08-14 09:39:12 +08:00 committed by GitHub
commit 9796c9dfa4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 28 additions and 36 deletions

View File

@ -86,7 +86,7 @@ extern int32_t tsNumOfQnodeQueryThreads;
extern int32_t tsNumOfQnodeFetchThreads;
extern int32_t tsNumOfSnodeStreamThreads;
extern int32_t tsNumOfSnodeWriteThreads;
extern int64_t tsRpcQueueMemoryAllowed;
extern int64_t tsQueueMemoryAllowed;
extern int32_t tsRetentionSpeedLimitMB;
// sync raft

View File

@ -691,8 +691,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfSnodeWriteThreads = tsNumOfCores / 4;
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
// clang-format off
TAOS_CHECK_RETURN(cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER, CFG_DYN_NONE));
@ -722,7 +722,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_BOTH, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
@ -958,9 +958,9 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem = cfgGetItem(pCfg, "rpcQueueMemoryAllowed");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsRpcQueueMemoryAllowed = totalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
pItem->i64 = tsRpcQueueMemoryAllowed;
tsQueueMemoryAllowed = totalMemoryKB * 1024 * 0.1;
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
pItem->i64 = tsQueueMemoryAllowed;
pItem->stype = stype;
}
@ -1357,7 +1357,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfSnodeWriteThreads = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "rpcQueueMemoryAllowed");
tsRpcQueueMemoryAllowed = pItem->i64;
tsQueueMemoryAllowed = pItem->i64;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "simdEnable");
tsSIMDEnable = (bool)pItem->bval;

View File

@ -124,7 +124,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.numOfSupportVnodes = tsNumOfSupportVnodes;
req.numOfDiskCfg = tsDiskCfgNum;
req.memTotal = tsTotalMemoryKB * 1024;
req.memAvail = req.memTotal - tsRpcQueueMemoryAllowed - 16 * 1024 * 1024;
req.memAvail = req.memTotal - tsQueueMemoryAllowed - 16 * 1024 * 1024;
tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);

View File

@ -218,7 +218,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
pRpc->info.wrapper = pWrapper;
EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed
EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsQueueMemoryUsed
code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
if (code) goto _OVER;

View File

@ -36,7 +36,7 @@ void Testbase::InitLog(const char* path) {
tstrncpy(tsLogDir, path, PATH_MAX);
taosGetSystemInfo();
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 0.1;
tsQueueMemoryAllowed = tsTotalMemoryKB * 0.1;
if (taosInitLog("taosdlog", 1, false) != 0) {
printf("failed to init log file\n");
}

View File

@ -92,7 +92,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
TAOS_DEFINE_ERROR(TSDB_CODE_DUP_KEY, "Cannot add duplicate keys to hash")
TAOS_DEFINE_ERROR(TSDB_CODE_NEED_RETRY, "Retry needed")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in rpc queue")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE, "Out of memory in queue")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIMESTAMP, "Invalid timestamp format")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_ENCODE_ERROR, "Msg encode error")

View File

@ -19,8 +19,8 @@
#include "tlog.h"
#include "tutil.h"
int64_t tsRpcQueueMemoryAllowed = 0;
int64_t tsRpcQueueMemoryUsed = 0;
int64_t tsQueueMemoryAllowed = 0;
int64_t tsQueueMemoryUsed = 0;
struct STaosQueue {
STaosQnode *head;
@ -148,10 +148,20 @@ int64_t taosQueueMemorySize(STaosQueue *queue) {
}
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
*item = NULL;
int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
if (alloced > tsQueueMemoryAllowed) {
if (itype == RPC_QITEM) {
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
tsQueueMemoryAllowed);
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
}
}
*item = NULL;
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
if (pNode == NULL) {
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
return terrno = TSDB_CODE_OUT_OF_MEMORY;
}
@ -159,21 +169,7 @@ int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **
pNode->size = size;
pNode->itype = itype;
pNode->timestamp = taosGetTimestampUs();
if (itype == RPC_QITEM) {
int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
if (alloced > tsRpcQueueMemoryAllowed) {
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
tsRpcQueueMemoryAllowed);
(void)atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize);
taosMemoryFree(pNode);
return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
}
uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
} else {
uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
}
uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced);
*item = pNode->item;
return 0;
}
@ -182,12 +178,8 @@ void taosFreeQitem(void *pItem) {
if (pItem == NULL) return;
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
if (pNode->itype == RPC_QITEM) {
int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size + pNode->dataSize);
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
} else {
uTrace("item:%p, node:%p is freed", pItem, pNode);
}
int64_t alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
taosMemoryFree(pNode);
}