diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 43d571a30a..343ffc94c8 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -42,8 +42,8 @@ typedef enum MemPoolUsageLevel { typedef void (*mpDecConcSessionNum)(void); typedef void (*mpIncConcSessionNum)(void); typedef void (*mpSetConcSessionNum)(int32_t); -typedef bool (*mpRetireCollection)(uint64_t, int64_t, bool); -typedef void (*mpCfgUpdate)(void); +typedef bool (*mpRetireCollection)(int64_t, int64_t, int64_t, bool); +typedef void (*mpCfgUpdate)(void*, SMemPoolCfg*); typedef struct SMemPoolCallBack { mpDecConcSessionNum decSessFp; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 191fb06f0f..0f1951d577 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -46,7 +46,6 @@ extern "C" { #define QW_MIN_MEM_POOL_SIZE (1048576UL) #define QW_DEFAULT_THREAD_TASK_NUM 3 -#define QW_DEFAULT_MIN_MEM_POOL_SIZE 104857600UL enum { QW_CONC_TASK_LEVEL_LOW = 1, diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 123edbb674..81755fa753 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -20,7 +20,7 @@ int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chu return TSDB_CODE_SUCCESS; } -void qwSetConcurrentTaskNum(int32_t taskNum) { +void qwSetConcurrentTaskNumCb(int32_t taskNum) { int32_t finTaskNum = TMIN(taskNum, tsNumOfQueryThreads * QW_DEFAULT_THREAD_TASK_NUM); if (tsQueryMaxConcurrentTaskNum > 0) { @@ -33,7 +33,7 @@ void qwSetConcurrentTaskNum(int32_t taskNum) { atomic_store_32(&gQueryMgmt.concTaskLevel, QW_CONC_TASK_LEVEL_FULL); } -void qwDecConcurrentTaskNum(void) { +void qwDecConcurrentTaskNumCb(void) { int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel); if (concTaskLevel <= QW_CONC_TASK_LEVEL_LOW) { qError("Unable to decrease concurrent task num, current task level:%d", concTaskLevel); @@ -43,7 +43,7 @@ void qwDecConcurrentTaskNum(void) { //TODO } -void qwIncConcurrentTaskNum(void) { +void qwIncConcurrentTaskNumCb(void) { int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel); if (concTaskLevel >= QW_CONC_TASK_LEVEL_FULL) { qError("Unable to increase concurrent task num, current task level:%d", concTaskLevel); @@ -117,8 +117,8 @@ bool qwLowLevelRetire() { } -bool qwRetireCollection(uint64_t collectionId, int64_t retireSize, bool retireLow) { - if (retireLow) { +bool qwRetireCollectionCb(int64_t collectionId, int64_t collectionAllocSize, int64_t retireSize, bool retireLow) { + if (retireLow && collectionAllocSize > retireSize) { return qwLowLevelRetire(); } @@ -150,7 +150,7 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { return code; } -void qwCheckUpateCfg(SMemPoolCfg* pCfg) { +void qwCheckUpateCfgCb(void* pHandle, SMemPoolCfg* pCfg) { int64_t newCollectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; if (pCfg->collectionQuota != newCollectionQuota) { atomic_store_64(&pCfg->collectionQuota, newCollectionQuota); @@ -160,7 +160,7 @@ void qwCheckUpateCfg(SMemPoolCfg* pCfg) { bool autoMaxSize = false; int32_t code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize); if (TSDB_CODE_SUCCESS != code) { - pCfg->maxSize = QW_DEFAULT_MIN_MEM_POOL_SIZE; + pCfg->maxSize = 0; qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize); return; } @@ -168,7 +168,7 @@ void qwCheckUpateCfg(SMemPoolCfg* pCfg) { if (pCfg->autoMaxSize != autoMaxSize || pCfg->maxSize != maxSize) { pCfg->autoMaxSize = autoMaxSize; atomic_store_64(&pCfg->maxSize, maxSize); - taosMemPoolCfgUpdate(); + taosMemPoolCfgUpdate(pHandle, pCfg); } } @@ -191,11 +191,11 @@ int32_t qwInitQueryPool(void) { cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO cfg.collectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; - cfg.cb.setSessFp = qwSetConcurrentTaskNum; - cfg.cb.decSessFp = qwDecConcurrentTaskNum; - cfg.cb.incSessFp = qwIncConcurrentTaskNum; - cfg.cb.retireFp = qwRetireCollection; - cfg.cb.cfgUpdateFp = qwCheckUpateCfg; + cfg.cb.setSessFp = qwSetConcurrentTaskNumCb; + cfg.cb.decSessFp = qwDecConcurrentTaskNumCb; + cfg.cb.incSessFp = qwIncConcurrentTaskNumCb; + cfg.cb.retireFp = qwRetireCollectionCb; + cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb; code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 4a67b6973d..3329f7e2cf 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -37,6 +37,7 @@ extern "C" { #define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9) #define MP_RETIRE_LOW_THRESHOLD_PERCENT (0.85) #define MP_RETIRE_UNIT_PERCENT (0.1) +#define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL) // FLAGS AREA diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index d7fbe14b0a..e23dd1f135 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -188,10 +188,11 @@ int32_t memPoolEnsureChunks(SMemPool* pPool) { } int32_t memPoolApplyCfgUpdate(SMemPool* pPool) { - pPool->memRetireThreshold[0] = pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT; - pPool->memRetireThreshold[1] = pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT; - pPool->memRetireThreshold[2] = pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT; - pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT; + atomic_store_64(&pPool->memRetireThreshold[0], pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT); + atomic_store_64(&pPool->memRetireThreshold[1], pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT); + atomic_store_64(&pPool->memRetireThreshold[2], pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT); + + atomic_store_64(&pPool->memRetireUnit, TMAX(pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT, MP_RETIRE_UNIT_MIN_SIZE)); if (E_MP_STRATEGY_CHUNK == gMPMgmt.strategy) { pPool->maxChunkNum = pPool->cfg.maxSize / pPool->cfg.chunkSize; @@ -204,7 +205,7 @@ int32_t memPoolApplyCfgUpdate(SMemPool* pPool) { pPool->chunkCache.groupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE); } - + return TSDB_CODE_SUCCESS; } @@ -391,54 +392,66 @@ _return: return pRes; } +int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { -bool memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { - int64_t allocSize = atomic_add_fetch_64(&pSession->pMpCollection->allocMemSize, size); +} + + +int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { + SMPCollection* pCollection = pSession->pMpCollection; + int64_t cAllocSize = atomic_add_fetch_64(&pCollection->allocMemSize, size); int64_t quota = atomic_load_64(&pPool->cfg.collectionQuota); - if (quota > 0 && allocSize > quota) { - uWarn("collection %" PRIx64 " allocSize " PRId64 " is over than quota %" PRId64, pSession->pMpCollection->collectionId, allocSize, quota); - atomic_sub_fetch_64(&pSession->pMpCollection->allocMemSize, size); - return true; + if (quota > 0 && cAllocSize > quota) { + uWarn("collection %" PRIx64 " allocSize " PRId64 " is over than quota %" PRId64, pCollection->collectionId, cAllocSize, quota); + atomic_sub_fetch_64(&pCollection->allocMemSize, size); + MP_RET(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); } - allocSize = atomic_add_fetch_64(&pPool->allocMemSize, size); + int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size); quota = atomic_load_64(&pPool->memRetireThreshold[2]); - if (allocSize >= quota) { - uWarn("pool allocSize " PRId64 " reaches the high quota %" PRId64, allocSize, quota); + if (pAllocSize >= quota) { + uWarn("pool allocSize " PRId64 " reaches the high quota %" PRId64, pAllocSize, quota); + atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); - return true; + MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } quota = atomic_load_64(&pPool->memRetireThreshold[1]); - if (allocSize >= quota) { - uInfo("pool allocSize " PRId64 " reaches the middle quota %" PRId64, allocSize, quota); - bool retire = (*pPool->cfg.cb.retireFp)(pSession->pMpCollection->collectionId, pPool->memRetireUnit, false); - if (retire) { - uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize)); + if (pAllocSize >= quota) { + uInfo("pool allocSize " PRId64 " reaches the middle quota %" PRId64, pAllocSize, quota); + if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) { + uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64 ", collectionSize: %" PRId64, + atomic_load_64(&pSession->allocMemSize), cAllocSize); + + atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); + + MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, false)); + MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } - return retire; + + return TSDB_CODE_SUCCESS; } quota = atomic_load_64(&pPool->memRetireThreshold[0]); - if (allocSize >= quota) { - uInfo("pool allocSize " PRId64 " reaches the low quota %" PRId64, allocSize, quota); - bool retire = (*pPool->cfg.cb.retireFp)(pSession->pMpCollection->collectionId, pPool->memRetireUnit, true); - if (retire) { + if (pAllocSize >= quota) { + uInfo("pool allocSize " PRId64 " reaches the low quota %" PRId64, pAllocSize, quota); + if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) { uWarn("session retired in low quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize)); + atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); + + MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, true)); + MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } - return retire; } - return false; + return TSDB_CODE_SUCCESS; } -void* memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { - if (memPoolMemQuotaOverflow(pPool, pSession, size)) { - uInfo("session needs to retire memory"); - return NULL; - } +int32_t memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + MP_ERR_RET(memPoolMemQuotaOverflow(pPool, pSession, size)); void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size); if (NULL != res) { @@ -447,9 +460,11 @@ void* memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, ui atomic_sub_fetch_64(&pSession->pMpCollection->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); uError("malloc %" PRId64 " alighment %d failed", size, alignment); + + code = TAOS_SYSTEM_ERROR(errno); } - return res; + MP_RET(code); } int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) { @@ -467,24 +482,21 @@ int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *pt return 0; } -void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { +int32_t memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { int32_t code = TSDB_CODE_SUCCESS; - void *res = NULL; switch (gMPMgmt.strategy) { case E_MP_STRATEGY_DIRECT: - res = memPoolAllocDirect(pPool, pSession, size, alignment); + MP_ERR_RET(memPoolAllocDirect(pPool, pSession, size, alignment, ppRes)); break; case E_MP_STRATEGY_CHUNK: - res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment) : memPoolAllocFromChunk(pPool, pSession, size, alignment); + MP_ERR_RET((size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment, ppRes) : memPoolAllocFromChunk(pPool, pSession, size, alignment, ppRes)); break; default: break; } -_return: - - return res; + return code; } void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size) { @@ -535,56 +547,60 @@ void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* return; } -void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t size, int64_t* origSize) { - void *res = NULL; +int32_t memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) { + int32_t code = TSDB_CODE_SUCCESS; - if (NULL == ptr) { + if (NULL == *pPtr) { *origSize = 0; - res = memPoolMallocImpl(pPool, pSession, size, 0); - return res; + MP_RET(memPoolMallocImpl(pPool, pSession, size, 0)); } if (0 == size) { - memPoolFreeImpl(pPool, pSession, ptr, origSize); - return res; + memPoolFreeImpl(pPool, pSession, *pPtr, origSize); + return TSDB_CODE_SUCCESS; } - *origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr); + *origSize = memPoolGetMemorySizeImpl(pPool, pSession, *pPtr); switch (gMPMgmt.strategy) { case E_MP_STRATEGY_DIRECT: { - if (memPoolMemQuotaOverflow(pPool, pSession, size)) { - uInfo("session needs to retire memory"); - return NULL; - } + MP_ERR_JRET(memPoolMemQuotaOverflow(pPool, pSession, size)); - res = taosMemRealloc(ptr, size); - if (NULL != res) { + *pPtr = taosMemRealloc(*pPtr, size); + if (NULL != *pPtr) { memPoolUpdateAllocMemSize(pPool, pSession, size - *origSize); + } else { + MP_ERR_JRET(TAOS_SYSTEM_ERROR(errno)); } - return res; + break; } case E_MP_STRATEGY_CHUNK: { if (*origSize >= size) { - SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); + SMPMemHeader* pHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader)); pHeader->size = size; - return ptr; + return TSDB_CODE_SUCCESS; } - - res = memPoolMallocImpl(pPool, pSession, size, 0); - SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); + + void* res = NULL; + MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, 0, &res)); + SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader)); SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader)); - memcpy(res, ptr, *origSize); - memPoolFreeImpl(pPool, pSession, ptr, NULL); + memcpy(res, *pPtr, *origSize); + memPoolFreeImpl(pPool, pSession, *pPtr, NULL); + *pPtr = res; - return res; + return TSDB_CODE_SUCCESS; } default: break; } + +_return: + + memPoolFreeImpl(pPool, pSession, *pPtr, NULL); - return NULL; + return code; } void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName, int64_t maxAllocSize) { @@ -799,12 +815,14 @@ void* memPoolMgmtThreadFunc(void* param) { while (0 == atomic_load_8(&gMPMgmt.modExit)) { tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs); + (*pPool->cfg.cb.retireFp)(pCollection->collectionId, cAllocSize, atomic_load_64(&pPool->memRetireUnit), true); + taosRLockLatch(&gMPMgmt.poolLock); int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList); for (int32_t i = 0; i < poolNum; ++i) { SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i); if (pPool->cfg.cb.cfgUpdateFp) { - (*pPool->cfg.cb.cfgUpdateFp)(&pPool->cfg); + (*pPool->cfg.cb.cfgUpdateFp)((void*)pPool, &pPool->cfg); } } taosRUnLockLatch(&gMPMgmt.poolLock); @@ -882,8 +900,10 @@ _return: return code; } -void taosMemPoolCfgUpdate() { +void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) { + SMemPool* pPool = (SMemPool*)poolHandle; + MP_ERR_RET(memPoolApplyCfgUpdate(pPool)); } void taosMemPoolDestroySession(void* poolHandle, void* session) { @@ -932,7 +952,7 @@ _return: } -void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { +void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { int32_t code = TSDB_CODE_SUCCESS; void *res = NULL; @@ -945,7 +965,7 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* f SMPSession* pSession = (SMPSession*)session; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - res = memPoolMallocImpl(pPool, pSession, size, 0); + MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, 0, &res)); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); @@ -984,7 +1004,7 @@ _return: return res; } -void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { +void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { int32_t code = TSDB_CODE_SUCCESS; void *res = NULL; @@ -998,14 +1018,14 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t s SMPSession* pSession = (SMPSession*)session; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - res = memPoolReallocImpl(pPool, pSession, ptr, size, &input.origSize); + MP_ERR_JRET(memPoolReallocImpl(pPool, pSession, &ptr, size, &input.origSize)); MP_SET_FLAG(input.procFlags, ((res || 0 == size) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); _return: - return res; + return ptr; } char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {