enh: add max memsize update

This commit is contained in:
dapan1121 2024-07-10 19:28:11 +08:00
parent 30d63415e8
commit 0b7f0e7334
10 changed files with 157 additions and 45 deletions

View File

@ -72,7 +72,9 @@ extern int32_t tsTagFilterResCacheSize;
extern int32_t tsQueryMinConcurrentTaskNum; extern int32_t tsQueryMinConcurrentTaskNum;
extern int32_t tsQueryMaxConcurrentTaskNum; extern int32_t tsQueryMaxConcurrentTaskNum;
extern int32_t tsQueryConcurrentTaskNum; extern int32_t tsQueryConcurrentTaskNum;
extern int64_t tsSingleQueryMaxMemorySize; extern int32_t tsSingleQueryMaxMemorySize;
extern int8_t tsQueryUseMemoryPool;
extern int32 tsQueryBufferPoolSize;
extern int32_t tsNumOfQueryThreads; extern int32_t tsNumOfQueryThreads;
extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions; extern int32_t tsNumOfRpcSessions;

View File

@ -39,19 +39,22 @@ typedef enum MemPoolUsageLevel {
E_MEM_USAGE_MAX_VALUE E_MEM_USAGE_MAX_VALUE
} MemPoolUsageLevel; } MemPoolUsageLevel;
typedef void (*decConcSessionNum)(void); typedef void (*mpDecConcSessionNum)(void);
typedef void (*incConcSessionNum)(void); typedef void (*mpIncConcSessionNum)(void);
typedef void (*setConcSessionNum)(int32_t); typedef void (*mpSetConcSessionNum)(int32_t);
typedef bool (*retireCollection)(uint64_t, int64_t, bool); typedef bool (*mpRetireCollection)(uint64_t, int64_t, bool);
typedef void (*mpCfgUpdate)(void);
typedef struct SMemPoolCallBack { typedef struct SMemPoolCallBack {
decConcSessionNum decSessFp; mpDecConcSessionNum decSessFp;
incConcSessionNum incSessFp; mpIncConcSessionNum incSessFp;
setConcSessionNum setSessFp; mpSetConcSessionNum setSessFp;
retireCollection retireFp; mpRetireCollection retireFp;
mpCfgUpdate cfgUpdateFp;
} SMemPoolCallBack; } SMemPoolCallBack;
typedef struct SMemPoolCfg { typedef struct SMemPoolCfg {
bool autoMaxSize;
int64_t maxSize; int64_t maxSize;
int64_t sessionExpectSize; int64_t sessionExpectSize;
int64_t collectionQuota; int64_t collectionQuota;

View File

@ -593,6 +593,9 @@ int32_t taosGetErrSize();
#define TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x0731) #define TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x0731)
#define TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0732) #define TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0732)
#define TSDB_CODE_QRY_INVALID_JOIN_CONDITION TAOS_DEF_ERROR_CODE(0, 0x0733) #define TSDB_CODE_QRY_INVALID_JOIN_CONDITION TAOS_DEF_ERROR_CODE(0, 0x0733)
#define TSDB_CODE_QRY_REACH_QMEM_THRESHOLD TAOS_DEF_ERROR_CODE(0, 0x0734)
#define TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED TAOS_DEF_ERROR_CODE(0, 0x0735)
#define TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM TAOS_DEF_ERROR_CODE(0, 0x0736)
// grant // grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)

View File

@ -55,12 +55,13 @@ int32_t tsShellActivityTimer = 3; // second
// memory pool // memory pool
int8_t tsQueryUseMemoryPool = 1; int8_t tsQueryUseMemoryPool = 1;
int32 tsQueryBufferPoolSize = 0; //MB
int32_t tsSingleQueryMaxMemorySize = 0; //MB
// queue & threads // queue & threads
int32_t tsQueryMinConcurrentTaskNum = 1; int32_t tsQueryMinConcurrentTaskNum = 1;
int32_t tsQueryMaxConcurrentTaskNum = 0; int32_t tsQueryMaxConcurrentTaskNum = 0;
int32_t tsQueryConcurrentTaskNum = 0; int32_t tsQueryConcurrentTaskNum = 0;
int64_t tsSingleQueryMaxMemorySize = 1000000; //MB
int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 30000; int32_t tsNumOfRpcSessions = 30000;
@ -678,7 +679,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt64(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 100, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
@ -1144,7 +1146,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
tsQueryUseMemoryPool = (bool)cfgGetItem(pCfg, "queryUseMemoryPool")->bval; tsQueryUseMemoryPool = (bool)cfgGetItem(pCfg, "queryUseMemoryPool")->bval;
tsSingleQueryMaxMemorySize = cfgGetItem(pCfg, "singleQueryMaxMemorySize")->i64 * 1048576; tsSingleQueryMaxMemorySize = cfgGetItem(pCfg, "singleQueryMaxMemorySize")->i32;
tsQueryBufferPoolSize = cfgGetItem(pCfg, "queryBufferPoolSize")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32; tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32;
@ -1599,7 +1602,9 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"s3UploadDelaySec", &tsS3UploadDelaySec}, {"s3UploadDelaySec", &tsS3UploadDelaySec},
{"supportVnodes", &tsNumOfSupportVnodes}, {"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental}, {"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum}}; {"maxTsmaNum", &tsMaxTsmaNum},
{"queryBufferPoolSize", &tsQueryBufferPoolSize},
{"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize}};
if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) {
taosCfgSetOption(options, tListLen(options), pItem, false); taosCfgSetOption(options, tListLen(options), pItem, false);

View File

@ -42,10 +42,11 @@ extern "C" {
#define QW_QUERY_MEM_POOL_NAME "Query" #define QW_QUERY_MEM_POOL_NAME "Query"
#define QW_DEFAULT_RESERVE_MEM_PERCENT 20 #define QW_DEFAULT_RESERVE_MEM_PERCENT 20
#define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576) #define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576UL)
#define QW_MIN_MEM_POOL_SIZE (256 * 1048576) #define QW_MIN_MEM_POOL_SIZE (1048576UL)
#define QW_DEFAULT_THREAD_TASK_NUM 3 #define QW_DEFAULT_THREAD_TASK_NUM 3
#define QW_DEFAULT_MIN_MEM_POOL_SIZE 104857600UL
enum { enum {
QW_CONC_TASK_LEVEL_LOW = 1, QW_CONC_TASK_LEVEL_LOW = 1,

View File

@ -2,11 +2,12 @@
#include "qworker.h" #include "qworker.h"
int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576 * 1048576, QW_MIN_RESERVE_MEM_SIZE); int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE);
int64_t availSize = (totalSize - reserveSize) / 1048576 * 1048576; int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL;
//if (availSize < QW_MIN_MEM_POOL_SIZE) { if (availSize < QW_MIN_MEM_POOL_SIZE) {
// return -1; qError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize);
//} return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM;
}
*maxSize = availSize; *maxSize = availSize;
@ -124,26 +125,77 @@ bool qwRetireCollection(uint64_t collectionId, int64_t retireSize, bool retireLo
return false; return false;
} }
int32_t qwInitQueryPool(void) { int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
if (tsQueryBufferPoolSize > 0) {
*pMaxSize = tsQueryBufferPoolSize * 1048576UL;
*autoMaxSize = false;
return TSDB_CODE_SUCCESS;
}
int64_t memSize = 0; int64_t memSize = 0;
int32_t code = taosGetSysAvailMemory(&memSize); int32_t code = taosGetSysAvailMemory(&memSize);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qError("get system avaiable memory size failed, errno: %d", errno);
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
code = qwGetMemPoolMaxMemSize(memSize, pMaxSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
*autoMaxSize = true;
return code;
}
void qwCheckUpateCfg(SMemPoolCfg* pCfg) {
int64_t newCollectionQuota = tsSingleQueryMaxMemorySize * 1048576UL;
if (pCfg->collectionQuota != newCollectionQuota) {
atomic_store_64(&pCfg->collectionQuota, newCollectionQuota);
}
int64_t maxSize = 0;
bool autoMaxSize = false;
int32_t code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize);
if (TSDB_CODE_SUCCESS != code) {
pCfg->maxSize = QW_DEFAULT_MIN_MEM_POOL_SIZE;
qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize);
return;
}
if (pCfg->autoMaxSize != autoMaxSize || pCfg->maxSize != maxSize) {
pCfg->autoMaxSize = autoMaxSize;
atomic_store_64(&pCfg->maxSize, maxSize);
taosMemPoolCfgUpdate();
}
}
int32_t qwInitQueryPool(void) {
int32_t code = TSDB_CODE_SUCCESS;
if (!tsQueryUseMemoryPool) {
qDebug("query memory pool disabled");
return code;
}
SMemPoolCfg cfg = {0}; SMemPoolCfg cfg = {0};
code = qwGetMemPoolMaxMemSize(memSize, &cfg.maxSize); int64_t maxSize = 0;
bool autoMaxSize = false;
code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
cfg.threadNum = 10; //TODO cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.collectionQuota = tsSingleQueryMaxMemorySize * 1048576; cfg.collectionQuota = tsSingleQueryMaxMemorySize * 1048576UL;
cfg.cb.setSessFp = qwSetConcurrentTaskNum; cfg.cb.setSessFp = qwSetConcurrentTaskNum;
cfg.cb.decSessFp = qwDecConcurrentTaskNum; cfg.cb.decSessFp = qwDecConcurrentTaskNum;
cfg.cb.incSessFp = qwIncConcurrentTaskNum; cfg.cb.incSessFp = qwIncConcurrentTaskNum;
cfg.cb.retireFp = qwRetireCollection; cfg.cb.retireFp = qwRetireCollection;
cfg.cb.cfgUpdateFp = qwCheckUpateCfg;
code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -161,6 +213,8 @@ int32_t qwInitQueryPool(void) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
qDebug("query memory pool initialized");
return code; return code;
} }

View File

@ -19,7 +19,7 @@ SQWorkerMgmt gQwMgmt = {
}; };
TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT;
SQueryMgmt gQueryMgmt; SQueryMgmt gQueryMgmt = {0};
int32_t qwStopAllTasks(SQWorker *mgmt) { int32_t qwStopAllTasks(SQWorker *mgmt) {

View File

@ -31,6 +31,7 @@ extern "C" {
#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000 #define MP_MAX_KEEP_FREE_CHUNK_NUM 1000
#define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF #define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF
#define MP_DEFAULT_MEM_CHK_INTERVAL_MS 100
#define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95) #define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95)
#define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9) #define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9)
@ -291,8 +292,11 @@ typedef enum EMPMemStrategy {
typedef struct SMemPoolMgmt { typedef struct SMemPoolMgmt {
EMPMemStrategy strategy; EMPMemStrategy strategy;
SArray* poolList; SArray* poolList;
TdThreadMutex poolMutex; SRWLatch poolLock;
TdThread poolMgmtThread; TdThread poolMgmtThread;
tsem2_t threadSem;
int8_t modExit;
int64_t waitMs;
int32_t code; int32_t code;
} SMemPoolMgmt; } SMemPoolMgmt;

View File

@ -467,6 +467,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR, "Geometry not support
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_WINDOW_CONDITION, "The time pseudo column is illegally used in the condition of the event window.") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_WINDOW_CONDITION, "The time pseudo column is illegally used in the condition of the event window.")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, "Executor internal error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, "Executor internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_JOIN_CONDITION, "Not supported join on condition") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_JOIN_CONDITION, "Not supported join on condition")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD, "Query memory upper limit is reached")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED, "Query memory exhausted")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM, "Too few available memory for query")
// grant // grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")

View File

@ -22,7 +22,7 @@
static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
threadlocal void* threadPoolHandle = NULL; threadlocal void* threadPoolHandle = NULL;
threadlocal void* threadPoolSession = NULL; threadlocal void* threadPoolSession = NULL;
SMemPoolMgmt gMPMgmt; SMemPoolMgmt gMPMgmt = {0};
int32_t memPoolCheckCfg(SMemPoolCfg* cfg) { int32_t memPoolCheckCfg(SMemPoolCfg* cfg) {
if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) { if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) {
@ -187,6 +187,27 @@ int32_t memPoolEnsureChunks(SMemPool* pPool) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t memPoolApplyCfgUpdate(SMemPool* pPool) {
pPool->memRetireThreshold[0] = pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT;
pPool->memRetireThreshold[1] = pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT;
pPool->memRetireThreshold[2] = pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT;
pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT;
if (E_MP_STRATEGY_CHUNK == gMPMgmt.strategy) {
pPool->maxChunkNum = pPool->cfg.maxSize / pPool->cfg.chunkSize;
if (pPool->maxChunkNum <= 0) {
uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", pPool->cfg.maxSize, pPool->cfg.chunkSize);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
}
pPool->readyChunkReserveNum = TMIN(pPool->cfg.threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum);
pPool->chunkCache.groupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE);
}
return TSDB_CODE_SUCCESS;
}
int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(memPoolCheckCfg(cfg)); MP_ERR_RET(memPoolCheckCfg(cfg));
@ -198,23 +219,13 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
pPool->memRetireThreshold[0] = pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT; MP_ERR_RET(memPoolApplyCfgUpdate(pPool));
pPool->memRetireThreshold[1] = pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT;
pPool->memRetireThreshold[2] = pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT;
pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT;
pPool->maxChunkNum = cfg->maxSize / cfg->chunkSize;
if (pPool->maxChunkNum <= 0) {
uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", cfg->maxSize, cfg->chunkSize);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
}
pPool->ctrlInfo.statFlags = MP_STAT_FLAG_LOG_ALL; pPool->ctrlInfo.statFlags = MP_STAT_FLAG_LOG_ALL;
pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT; pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT;
pPool->threadChunkReserveNum = 1; pPool->threadChunkReserveNum = 1;
pPool->readyChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum);
pPool->chunkCache.groupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE);
pPool->chunkCache.nodeSize = sizeof(SMPChunk); pPool->chunkCache.nodeSize = sizeof(SMPChunk);
pPool->NSChunkCache.groupNum = MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE; pPool->NSChunkCache.groupNum = MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE;
pPool->NSChunkCache.nodeSize = sizeof(SMPNSChunk); pPool->NSChunkCache.nodeSize = sizeof(SMPNSChunk);
@ -785,12 +796,25 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item,
} }
void* memPoolMgmtThreadFunc(void* param) { void* memPoolMgmtThreadFunc(void* param) {
//TODO while (0 == atomic_load_8(&gMPMgmt.modExit)) {
tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs);
taosRLockLatch(&gMPMgmt.poolLock);
int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList);
for (int32_t i = 0; i < poolNum; ++i) {
SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i);
if (pPool->cfg.cb.cfgUpdateFp) {
(*pPool->cfg.cb.cfgUpdateFp)(&pPool->cfg);
}
}
taosRUnLockLatch(&gMPMgmt.poolLock);
}
return NULL; return NULL;
} }
void taosMemPoolModInit(void) { void taosMemPoolModInit(void) {
taosThreadMutexInit(&gMPMgmt.poolMutex, NULL); taosInitRWLatch(&gMPMgmt.poolLock);
gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES); gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES);
if (NULL == gMPMgmt.poolList) { if (NULL == gMPMgmt.poolList) {
@ -800,6 +824,15 @@ void taosMemPoolModInit(void) {
gMPMgmt.strategy = E_MP_STRATEGY_DIRECT; gMPMgmt.strategy = E_MP_STRATEGY_DIRECT;
gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0);
if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
gMPMgmt.code = TAOS_SYSTEM_ERROR(errno);
qError("failed to init sem2, error: %x", gMPMgmt.code);
return;
}
gMPMgmt.waitMs = MP_DEFAULT_MEM_CHK_INTERVAL_MS;
TdThreadAttr thAttr; TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr); taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
@ -817,9 +850,9 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
SMemPool* pPool = NULL; SMemPool* pPool = NULL;
taosThreadOnce(&gMPoolInit, taosMemPoolModInit); taosThreadOnce(&gMPoolInit, taosMemPoolModInit);
if (NULL == gMPMgmt.poolList) { if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
uError("init memory pool failed"); uError("init memory pool failed");
MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); MP_ERR_JRET(gMPMgmt.code);
} }
pPool = (SMemPool*)taosMemoryCalloc(1, sizeof(SMemPool)); pPool = (SMemPool*)taosMemoryCalloc(1, sizeof(SMemPool));
@ -830,12 +863,12 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
MP_ERR_JRET(memPoolInit(pPool, poolName, cfg)); MP_ERR_JRET(memPoolInit(pPool, poolName, cfg));
taosThreadMutexLock(&gMPMgmt.poolMutex); taosWLockLatch(&gMPMgmt.poolLock);
taosArrayPush(gMPMgmt.poolList, &pPool); taosArrayPush(gMPMgmt.poolList, &pPool);
pPool->slotId = taosArrayGetSize(gMPMgmt.poolList) - 1; pPool->slotId = taosArrayGetSize(gMPMgmt.poolList) - 1;
taosThreadMutexUnlock(&gMPMgmt.poolMutex); taosWUnLockLatch(&gMPMgmt.poolLock);
_return: _return:
@ -849,6 +882,10 @@ _return:
return code; return code;
} }
void taosMemPoolCfgUpdate() {
}
void taosMemPoolDestroySession(void* poolHandle, void* session) { void taosMemPoolDestroySession(void* poolHandle, void* session) {
SMemPool* pPool = (SMemPool*)poolHandle; SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session; SMPSession* pSession = (SMPSession*)session;