diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 698b8c55f9..a8134e433b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -72,7 +72,9 @@ extern int32_t tsTagFilterResCacheSize; extern int32_t tsQueryMinConcurrentTaskNum; extern int32_t tsQueryMaxConcurrentTaskNum; extern int32_t tsQueryConcurrentTaskNum; -extern int64_t tsSingleQueryMaxMemorySize; +extern int32_t tsSingleQueryMaxMemorySize; +extern int8_t tsQueryUseMemoryPool; +extern int32 tsQueryBufferPoolSize; extern int32_t tsNumOfQueryThreads; extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 56b856a802..43d571a30a 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -39,19 +39,22 @@ typedef enum MemPoolUsageLevel { E_MEM_USAGE_MAX_VALUE } MemPoolUsageLevel; -typedef void (*decConcSessionNum)(void); -typedef void (*incConcSessionNum)(void); -typedef void (*setConcSessionNum)(int32_t); -typedef bool (*retireCollection)(uint64_t, int64_t, bool); +typedef void (*mpDecConcSessionNum)(void); +typedef void (*mpIncConcSessionNum)(void); +typedef void (*mpSetConcSessionNum)(int32_t); +typedef bool (*mpRetireCollection)(uint64_t, int64_t, bool); +typedef void (*mpCfgUpdate)(void); typedef struct SMemPoolCallBack { - decConcSessionNum decSessFp; - incConcSessionNum incSessFp; - setConcSessionNum setSessFp; - retireCollection retireFp; + mpDecConcSessionNum decSessFp; + mpIncConcSessionNum incSessFp; + mpSetConcSessionNum setSessFp; + mpRetireCollection retireFp; + mpCfgUpdate cfgUpdateFp; } SMemPoolCallBack; typedef struct SMemPoolCfg { + bool autoMaxSize; int64_t maxSize; int64_t sessionExpectSize; int64_t collectionQuota; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9d020f98ac..0f0eb4147d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -593,6 +593,9 @@ int32_t taosGetErrSize(); #define TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x0731) #define TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x0732) #define TSDB_CODE_QRY_INVALID_JOIN_CONDITION TAOS_DEF_ERROR_CODE(0, 0x0733) +#define TSDB_CODE_QRY_REACH_QMEM_THRESHOLD TAOS_DEF_ERROR_CODE(0, 0x0734) +#define TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED TAOS_DEF_ERROR_CODE(0, 0x0735) +#define TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM TAOS_DEF_ERROR_CODE(0, 0x0736) // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 2f82302836..d452903f09 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -55,12 +55,13 @@ int32_t tsShellActivityTimer = 3; // second // memory pool int8_t tsQueryUseMemoryPool = 1; +int32 tsQueryBufferPoolSize = 0; //MB +int32_t tsSingleQueryMaxMemorySize = 0; //MB // queue & threads int32_t tsQueryMinConcurrentTaskNum = 1; int32_t tsQueryMaxConcurrentTaskNum = 0; int32_t tsQueryConcurrentTaskNum = 0; -int64_t tsSingleQueryMaxMemorySize = 1000000; //MB int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 30000; @@ -678,7 +679,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt64(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 100, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; @@ -1144,7 +1146,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; tsQueryUseMemoryPool = (bool)cfgGetItem(pCfg, "queryUseMemoryPool")->bval; - tsSingleQueryMaxMemorySize = cfgGetItem(pCfg, "singleQueryMaxMemorySize")->i64 * 1048576; + tsSingleQueryMaxMemorySize = cfgGetItem(pCfg, "singleQueryMaxMemorySize")->i32; + tsQueryBufferPoolSize = cfgGetItem(pCfg, "queryBufferPoolSize")->i32; tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32; @@ -1599,7 +1602,9 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"s3UploadDelaySec", &tsS3UploadDelaySec}, {"supportVnodes", &tsNumOfSupportVnodes}, {"experimental", &tsExperimental}, - {"maxTsmaNum", &tsMaxTsmaNum}}; + {"maxTsmaNum", &tsMaxTsmaNum}, + {"queryBufferPoolSize", &tsQueryBufferPoolSize}, + {"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize}}; if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { taosCfgSetOption(options, tListLen(options), pItem, false); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 99664822a7..191fb06f0f 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -42,10 +42,11 @@ extern "C" { #define QW_QUERY_MEM_POOL_NAME "Query" #define QW_DEFAULT_RESERVE_MEM_PERCENT 20 -#define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576) -#define QW_MIN_MEM_POOL_SIZE (256 * 1048576) +#define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576UL) +#define QW_MIN_MEM_POOL_SIZE (1048576UL) #define QW_DEFAULT_THREAD_TASK_NUM 3 +#define QW_DEFAULT_MIN_MEM_POOL_SIZE 104857600UL enum { QW_CONC_TASK_LEVEL_LOW = 1, diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 690daef84e..123edbb674 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -2,11 +2,12 @@ #include "qworker.h" int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { - int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576 * 1048576, QW_MIN_RESERVE_MEM_SIZE); - int64_t availSize = (totalSize - reserveSize) / 1048576 * 1048576; - //if (availSize < QW_MIN_MEM_POOL_SIZE) { - // return -1; - //} + int64_t reserveSize = TMAX(totalSize * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE); + int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL; + if (availSize < QW_MIN_MEM_POOL_SIZE) { + qError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize); + return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM; + } *maxSize = availSize; @@ -124,26 +125,77 @@ bool qwRetireCollection(uint64_t collectionId, int64_t retireSize, bool retireLo return false; } -int32_t qwInitQueryPool(void) { +int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { + if (tsQueryBufferPoolSize > 0) { + *pMaxSize = tsQueryBufferPoolSize * 1048576UL; + *autoMaxSize = false; + + return TSDB_CODE_SUCCESS; + } + int64_t memSize = 0; int32_t code = taosGetSysAvailMemory(&memSize); if (TSDB_CODE_SUCCESS != code) { + qError("get system avaiable memory size failed, errno: %d", errno); return TAOS_SYSTEM_ERROR(errno); } - SMemPoolCfg cfg = {0}; - code = qwGetMemPoolMaxMemSize(memSize, &cfg.maxSize); + code = qwGetMemPoolMaxMemSize(memSize, pMaxSize); if (TSDB_CODE_SUCCESS != code) { return code; } + *autoMaxSize = true; + + return code; +} + +void qwCheckUpateCfg(SMemPoolCfg* pCfg) { + int64_t newCollectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; + if (pCfg->collectionQuota != newCollectionQuota) { + atomic_store_64(&pCfg->collectionQuota, newCollectionQuota); + } + + int64_t maxSize = 0; + bool autoMaxSize = false; + int32_t code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize); + if (TSDB_CODE_SUCCESS != code) { + pCfg->maxSize = QW_DEFAULT_MIN_MEM_POOL_SIZE; + qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize); + return; + } + + if (pCfg->autoMaxSize != autoMaxSize || pCfg->maxSize != maxSize) { + pCfg->autoMaxSize = autoMaxSize; + atomic_store_64(&pCfg->maxSize, maxSize); + taosMemPoolCfgUpdate(); + } +} + +int32_t qwInitQueryPool(void) { + int32_t code = TSDB_CODE_SUCCESS; + + if (!tsQueryUseMemoryPool) { + qDebug("query memory pool disabled"); + return code; + } + + SMemPoolCfg cfg = {0}; + int64_t maxSize = 0; + bool autoMaxSize = false; + code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO - cfg.collectionQuota = tsSingleQueryMaxMemorySize * 1048576; + cfg.collectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; cfg.cb.setSessFp = qwSetConcurrentTaskNum; cfg.cb.decSessFp = qwDecConcurrentTaskNum; cfg.cb.incSessFp = qwIncConcurrentTaskNum; cfg.cb.retireFp = qwRetireCollection; + cfg.cb.cfgUpdateFp = qwCheckUpateCfg; code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); if (TSDB_CODE_SUCCESS != code) { @@ -161,6 +213,8 @@ int32_t qwInitQueryPool(void) { return TSDB_CODE_OUT_OF_MEMORY; } + qDebug("query memory pool initialized"); + return code; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 640f4114df..087a5301c6 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -19,7 +19,7 @@ SQWorkerMgmt gQwMgmt = { }; TdThreadOnce gQueryPoolInit = PTHREAD_ONCE_INIT; -SQueryMgmt gQueryMgmt; +SQueryMgmt gQueryMgmt = {0}; int32_t qwStopAllTasks(SQWorker *mgmt) { diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index c36c9c205d..4a67b6973d 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -31,6 +31,7 @@ extern "C" { #define MP_MAX_KEEP_FREE_CHUNK_NUM 1000 #define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF +#define MP_DEFAULT_MEM_CHK_INTERVAL_MS 100 #define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95) #define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9) @@ -291,8 +292,11 @@ typedef enum EMPMemStrategy { typedef struct SMemPoolMgmt { EMPMemStrategy strategy; SArray* poolList; - TdThreadMutex poolMutex; + SRWLatch poolLock; TdThread poolMgmtThread; + tsem2_t threadSem; + int8_t modExit; + int64_t waitMs; int32_t code; } SMemPoolMgmt; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index bbc44dd12c..88aba2815e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -467,6 +467,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR, "Geometry not support TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_WINDOW_CONDITION, "The time pseudo column is illegally used in the condition of the event window.") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, "Executor internal error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_JOIN_CONDITION, "Not supported join on condition") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD, "Query memory upper limit is reached") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED, "Query memory exhausted") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM, "Too few available memory for query") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired") diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 358d31b5e5..d7fbe14b0a 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -22,7 +22,7 @@ static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; threadlocal void* threadPoolHandle = NULL; threadlocal void* threadPoolSession = NULL; -SMemPoolMgmt gMPMgmt; +SMemPoolMgmt gMPMgmt = {0}; int32_t memPoolCheckCfg(SMemPoolCfg* cfg) { if (cfg->chunkSize < MEMPOOL_MIN_CHUNK_SIZE || cfg->chunkSize > MEMPOOL_MAX_CHUNK_SIZE) { @@ -187,6 +187,27 @@ int32_t memPoolEnsureChunks(SMemPool* pPool) { return TSDB_CODE_SUCCESS; } +int32_t memPoolApplyCfgUpdate(SMemPool* pPool) { + pPool->memRetireThreshold[0] = pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT; + pPool->memRetireThreshold[1] = pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT; + pPool->memRetireThreshold[2] = pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT; + pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT; + + if (E_MP_STRATEGY_CHUNK == gMPMgmt.strategy) { + pPool->maxChunkNum = pPool->cfg.maxSize / pPool->cfg.chunkSize; + if (pPool->maxChunkNum <= 0) { + uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", pPool->cfg.maxSize, pPool->cfg.chunkSize); + return TSDB_CODE_INVALID_MEM_POOL_PARAM; + } + + pPool->readyChunkReserveNum = TMIN(pPool->cfg.threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum); + + pPool->chunkCache.groupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE); + } + + return TSDB_CODE_SUCCESS; +} + int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET(memPoolCheckCfg(cfg)); @@ -198,23 +219,13 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - pPool->memRetireThreshold[0] = pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT; - pPool->memRetireThreshold[1] = pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT; - pPool->memRetireThreshold[2] = pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT; - pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT; - pPool->maxChunkNum = cfg->maxSize / cfg->chunkSize; - if (pPool->maxChunkNum <= 0) { - uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", cfg->maxSize, cfg->chunkSize); - return TSDB_CODE_INVALID_MEM_POOL_PARAM; - } + MP_ERR_RET(memPoolApplyCfgUpdate(pPool)); pPool->ctrlInfo.statFlags = MP_STAT_FLAG_LOG_ALL; pPool->ctrlInfo.funcFlags = MP_CTRL_FLAG_PRINT_STAT; pPool->threadChunkReserveNum = 1; - pPool->readyChunkReserveNum = TMIN(cfg->threadNum * pPool->threadChunkReserveNum, pPool->maxChunkNum); - pPool->chunkCache.groupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE); pPool->chunkCache.nodeSize = sizeof(SMPChunk); pPool->NSChunkCache.groupNum = MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE; pPool->NSChunkCache.nodeSize = sizeof(SMPNSChunk); @@ -785,13 +796,26 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, } void* memPoolMgmtThreadFunc(void* param) { - //TODO + while (0 == atomic_load_8(&gMPMgmt.modExit)) { + tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs); + + taosRLockLatch(&gMPMgmt.poolLock); + int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList); + for (int32_t i = 0; i < poolNum; ++i) { + SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i); + if (pPool->cfg.cb.cfgUpdateFp) { + (*pPool->cfg.cb.cfgUpdateFp)(&pPool->cfg); + } + } + taosRUnLockLatch(&gMPMgmt.poolLock); + } + return NULL; } void taosMemPoolModInit(void) { - taosThreadMutexInit(&gMPMgmt.poolMutex, NULL); - + taosInitRWLatch(&gMPMgmt.poolLock); + gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES); if (NULL == gMPMgmt.poolList) { gMPMgmt.code = TSDB_CODE_OUT_OF_MEMORY; @@ -799,6 +823,15 @@ void taosMemPoolModInit(void) { } gMPMgmt.strategy = E_MP_STRATEGY_DIRECT; + + gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0); + if (TSDB_CODE_SUCCESS != gMPMgmt.code) { + gMPMgmt.code = TAOS_SYSTEM_ERROR(errno); + qError("failed to init sem2, error: %x", gMPMgmt.code); + return; + } + + gMPMgmt.waitMs = MP_DEFAULT_MEM_CHK_INTERVAL_MS; TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); @@ -817,9 +850,9 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { SMemPool* pPool = NULL; taosThreadOnce(&gMPoolInit, taosMemPoolModInit); - if (NULL == gMPMgmt.poolList) { + if (TSDB_CODE_SUCCESS != gMPMgmt.code) { uError("init memory pool failed"); - MP_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + MP_ERR_JRET(gMPMgmt.code); } pPool = (SMemPool*)taosMemoryCalloc(1, sizeof(SMemPool)); @@ -830,12 +863,12 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { MP_ERR_JRET(memPoolInit(pPool, poolName, cfg)); - taosThreadMutexLock(&gMPMgmt.poolMutex); + taosWLockLatch(&gMPMgmt.poolLock); taosArrayPush(gMPMgmt.poolList, &pPool); pPool->slotId = taosArrayGetSize(gMPMgmt.poolList) - 1; - taosThreadMutexUnlock(&gMPMgmt.poolMutex); + taosWUnLockLatch(&gMPMgmt.poolLock); _return: @@ -849,6 +882,10 @@ _return: return code; } +void taosMemPoolCfgUpdate() { + +} + void taosMemPoolDestroySession(void* poolHandle, void* session) { SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session;