diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 343ffc94c8..b300502b5c 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -42,15 +42,17 @@ typedef enum MemPoolUsageLevel { typedef void (*mpDecConcSessionNum)(void); typedef void (*mpIncConcSessionNum)(void); typedef void (*mpSetConcSessionNum)(int32_t); -typedef bool (*mpRetireCollection)(int64_t, int64_t, int64_t, bool); +typedef void (*mpRetireCollections)(int64_t, bool); +typedef void (*mpRetireCollection)(uint64_t); typedef void (*mpCfgUpdate)(void*, SMemPoolCfg*); typedef struct SMemPoolCallBack { - mpDecConcSessionNum decSessFp; - mpIncConcSessionNum incSessFp; - mpSetConcSessionNum setSessFp; - mpRetireCollection retireFp; - mpCfgUpdate cfgUpdateFp; + mpDecConcSessionNum decSessFp; + mpIncConcSessionNum incSessFp; + mpSetConcSessionNum setSessFp; + mpRetireCollections retiresFp; + mpRetireCollection retireFp; + mpCfgUpdate cfgUpdateFp; } SMemPoolCallBack; typedef struct SMemPoolCfg { diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 81755fa753..5d4d108921 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -113,16 +113,36 @@ int32_t qwInitSession(QW_FPARAMS_DEF, void** ppSession) { return code; } -bool qwLowLevelRetire() { +void qwRetireCollectionCb(uint64_t collectionId) { } -bool qwRetireCollectionCb(int64_t collectionId, int64_t collectionAllocSize, int64_t retireSize, bool retireLow) { - if (retireLow && collectionAllocSize > retireSize) { - return qwLowLevelRetire(); +void qwLowLevelRetire(int64_t retireSize) { + void* pIter = taosHashIterate(gQueryMgmt.pQueryInfo, NULL); + while (pIter) { + vgInfo = pIter; + + pInfo->vgHash[i].vgId = vgInfo->vgId; + pInfo->vgHash[i].hashBegin = vgInfo->hashBegin; + pInfo->vgHash[i].hashEnd = vgInfo->hashEnd; + + pIter = taosHashIterate(gQueryMgmt.pQueryInfo, pIter); + vgInfo = NULL; + ++i; + } +} + +void qwMidLevelRetire(int64_t retireSize) { + +} + + +void qwRetireCollectionsCb(int64_t retireSize, bool lowLevelRetire) { + if (lowLevelRetire) { + qwLowLevelRetire(retireSize); + } else { + qwMidLevelRetire(retireSize); } - - return false; } int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { @@ -194,7 +214,8 @@ int32_t qwInitQueryPool(void) { cfg.cb.setSessFp = qwSetConcurrentTaskNumCb; cfg.cb.decSessFp = qwDecConcurrentTaskNumCb; cfg.cb.incSessFp = qwIncConcurrentTaskNumCb; - cfg.cb.retireFp = qwRetireCollectionCb; + cfg.cb.retiresFp = qwRetireCollectionsCb; + cfg.cb.retireFp = qwRetireCollectionCb; cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb; code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 3329f7e2cf..2561c21a08 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -194,7 +194,7 @@ typedef struct SMPStatInfo { typedef struct SMPCollection { - int64_t collectionId; + uint64_t collectionId; int64_t allocMemSize; int64_t maxAllocMemSize; @@ -290,11 +290,18 @@ typedef enum EMPMemStrategy { E_MP_STRATEGY_CHUNK, } EMPMemStrategy; +typedef struct SMPMsgQueue { + SMemPool* pPool; + bool lowLevelRetire; + bool midLevelRetire; +} SMPMsgQueue; + typedef struct SMemPoolMgmt { EMPMemStrategy strategy; SArray* poolList; SRWLatch poolLock; TdThread poolMgmtThread; + SMPMsgQueue msgQueue; tsem2_t threadSem; int8_t modExit; int64_t waitMs; diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index e23dd1f135..81d5c6d2cc 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -393,7 +393,21 @@ _return: } int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { + if (retireLowLevel) { + if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.lowLevelRetire, 0, 1)) { + atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool); + tsem2_post(&gMPMgmt.threadSem); + } + + return TSDB_CODE_SUCCESS; + } + if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.midLevelRetire, 0, 1)) { + atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool); + tsem2_post(&gMPMgmt.threadSem); + } + + return TSDB_CODE_SUCCESS; } @@ -403,6 +417,7 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s int64_t quota = atomic_load_64(&pPool->cfg.collectionQuota); if (quota > 0 && cAllocSize > quota) { uWarn("collection %" PRIx64 " allocSize " PRId64 " is over than quota %" PRId64, pCollection->collectionId, cAllocSize, quota); + pPool->cfg.cb.retireFp(pCollection->collectionId); atomic_sub_fetch_64(&pCollection->allocMemSize, size); MP_RET(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); } @@ -411,6 +426,7 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s quota = atomic_load_64(&pPool->memRetireThreshold[2]); if (pAllocSize >= quota) { uWarn("pool allocSize " PRId64 " reaches the high quota %" PRId64, pAllocSize, quota); + pPool->cfg.cb.retireFp(pCollection->collectionId); atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); @@ -419,10 +435,11 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s quota = atomic_load_64(&pPool->memRetireThreshold[1]); if (pAllocSize >= quota) { uInfo("pool allocSize " PRId64 " reaches the middle quota %" PRId64, pAllocSize, quota); - if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) { + if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit) / 2) { uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64 ", collectionSize: %" PRId64, atomic_load_64(&pSession->allocMemSize), cAllocSize); + pPool->cfg.cb.retireFp(pCollection->collectionId); atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); @@ -438,6 +455,8 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s uInfo("pool allocSize " PRId64 " reaches the low quota %" PRId64, pAllocSize, quota); if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) { uWarn("session retired in low quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize)); + + pPool->cfg.cb.retireFp(pCollection->collectionId); atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); @@ -811,21 +830,34 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, } } -void* memPoolMgmtThreadFunc(void* param) { - while (0 == atomic_load_8(&gMPMgmt.modExit)) { - tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs); - - (*pPool->cfg.cb.retireFp)(pCollection->collectionId, cAllocSize, atomic_load_64(&pPool->memRetireUnit), true); - - taosRLockLatch(&gMPMgmt.poolLock); - int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList); - for (int32_t i = 0; i < poolNum; ++i) { - SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i); - if (pPool->cfg.cb.cfgUpdateFp) { - (*pPool->cfg.cb.cfgUpdateFp)((void*)pPool, &pPool->cfg); - } +void memPoolCheckUpateCfg(void) { + taosRLockLatch(&gMPMgmt.poolLock); + int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList); + for (int32_t i = 0; i < poolNum; ++i) { + SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i); + if (pPool->cfg.cb.cfgUpdateFp) { + (*pPool->cfg.cb.cfgUpdateFp)((void*)pPool, &pPool->cfg); } - taosRUnLockLatch(&gMPMgmt.poolLock); + } + taosRUnLockLatch(&gMPMgmt.poolLock); +} + +void* memPoolMgmtThreadFunc(void* param) { + int32_t timeout = 0; + while (0 == atomic_load_8(&gMPMgmt.modExit)) { + timeout = tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs); + if (0 != timeout) { + memPoolCheckUpateCfg(); + continue; + } + + if (gMPMgmt.msgQueue.midLevelRetire) { + (*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), false); + } else if (gMPMgmt.msgQueue.lowLevelRetire) { + (*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), true); + } + + memPoolCheckUpateCfg(); } return NULL;