fix: add memory retire

This commit is contained in:
dapan1121 2024-07-12 11:02:45 +08:00
parent 761b0a9437
commit 2e984a76ef
4 changed files with 91 additions and 29 deletions

View File

@ -42,15 +42,17 @@ typedef enum MemPoolUsageLevel {
typedef void (*mpDecConcSessionNum)(void);
typedef void (*mpIncConcSessionNum)(void);
typedef void (*mpSetConcSessionNum)(int32_t);
typedef bool (*mpRetireCollection)(int64_t, int64_t, int64_t, bool);
typedef void (*mpRetireCollections)(int64_t, bool);
typedef void (*mpRetireCollection)(uint64_t);
typedef void (*mpCfgUpdate)(void*, SMemPoolCfg*);
typedef struct SMemPoolCallBack {
mpDecConcSessionNum decSessFp;
mpIncConcSessionNum incSessFp;
mpSetConcSessionNum setSessFp;
mpRetireCollection retireFp;
mpCfgUpdate cfgUpdateFp;
mpDecConcSessionNum decSessFp;
mpIncConcSessionNum incSessFp;
mpSetConcSessionNum setSessFp;
mpRetireCollections retiresFp;
mpRetireCollection retireFp;
mpCfgUpdate cfgUpdateFp;
} SMemPoolCallBack;
typedef struct SMemPoolCfg {

View File

@ -113,16 +113,36 @@ int32_t qwInitSession(QW_FPARAMS_DEF, void** ppSession) {
return code;
}
bool qwLowLevelRetire() {
void qwRetireCollectionCb(uint64_t collectionId) {
}
bool qwRetireCollectionCb(int64_t collectionId, int64_t collectionAllocSize, int64_t retireSize, bool retireLow) {
if (retireLow && collectionAllocSize > retireSize) {
return qwLowLevelRetire();
void qwLowLevelRetire(int64_t retireSize) {
void* pIter = taosHashIterate(gQueryMgmt.pQueryInfo, NULL);
while (pIter) {
vgInfo = pIter;
pInfo->vgHash[i].vgId = vgInfo->vgId;
pInfo->vgHash[i].hashBegin = vgInfo->hashBegin;
pInfo->vgHash[i].hashEnd = vgInfo->hashEnd;
pIter = taosHashIterate(gQueryMgmt.pQueryInfo, pIter);
vgInfo = NULL;
++i;
}
}
void qwMidLevelRetire(int64_t retireSize) {
}
void qwRetireCollectionsCb(int64_t retireSize, bool lowLevelRetire) {
if (lowLevelRetire) {
qwLowLevelRetire(retireSize);
} else {
qwMidLevelRetire(retireSize);
}
return false;
}
int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
@ -194,7 +214,8 @@ int32_t qwInitQueryPool(void) {
cfg.cb.setSessFp = qwSetConcurrentTaskNumCb;
cfg.cb.decSessFp = qwDecConcurrentTaskNumCb;
cfg.cb.incSessFp = qwIncConcurrentTaskNumCb;
cfg.cb.retireFp = qwRetireCollectionCb;
cfg.cb.retiresFp = qwRetireCollectionsCb;
cfg.cb.retireFp = qwRetireCollectionCb;
cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb;
code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize);

View File

@ -194,7 +194,7 @@ typedef struct SMPStatInfo {
typedef struct SMPCollection {
int64_t collectionId;
uint64_t collectionId;
int64_t allocMemSize;
int64_t maxAllocMemSize;
@ -290,11 +290,18 @@ typedef enum EMPMemStrategy {
E_MP_STRATEGY_CHUNK,
} EMPMemStrategy;
typedef struct SMPMsgQueue {
SMemPool* pPool;
bool lowLevelRetire;
bool midLevelRetire;
} SMPMsgQueue;
typedef struct SMemPoolMgmt {
EMPMemStrategy strategy;
SArray* poolList;
SRWLatch poolLock;
TdThread poolMgmtThread;
SMPMsgQueue msgQueue;
tsem2_t threadSem;
int8_t modExit;
int64_t waitMs;

View File

@ -393,7 +393,21 @@ _return:
}
int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
if (retireLowLevel) {
if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.lowLevelRetire, 0, 1)) {
atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool);
tsem2_post(&gMPMgmt.threadSem);
}
return TSDB_CODE_SUCCESS;
}
if (0 == atomic_val_compare_exchange_8(&gMPMgmt.msgQueue.midLevelRetire, 0, 1)) {
atomic_store_ptr(&gMPMgmt.msgQueue.pPool, pPool);
tsem2_post(&gMPMgmt.threadSem);
}
return TSDB_CODE_SUCCESS;
}
@ -403,6 +417,7 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s
int64_t quota = atomic_load_64(&pPool->cfg.collectionQuota);
if (quota > 0 && cAllocSize > quota) {
uWarn("collection %" PRIx64 " allocSize " PRId64 " is over than quota %" PRId64, pCollection->collectionId, cAllocSize, quota);
pPool->cfg.cb.retireFp(pCollection->collectionId);
atomic_sub_fetch_64(&pCollection->allocMemSize, size);
MP_RET(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD);
}
@ -411,6 +426,7 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s
quota = atomic_load_64(&pPool->memRetireThreshold[2]);
if (pAllocSize >= quota) {
uWarn("pool allocSize " PRId64 " reaches the high quota %" PRId64, pAllocSize, quota);
pPool->cfg.cb.retireFp(pCollection->collectionId);
atomic_sub_fetch_64(&pCollection->allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
@ -419,10 +435,11 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s
quota = atomic_load_64(&pPool->memRetireThreshold[1]);
if (pAllocSize >= quota) {
uInfo("pool allocSize " PRId64 " reaches the middle quota %" PRId64, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) {
if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit) / 2) {
uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64 ", collectionSize: %" PRId64,
atomic_load_64(&pSession->allocMemSize), cAllocSize);
pPool->cfg.cb.retireFp(pCollection->collectionId);
atomic_sub_fetch_64(&pCollection->allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size);
@ -438,6 +455,8 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s
uInfo("pool allocSize " PRId64 " reaches the low quota %" PRId64, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) {
uWarn("session retired in low quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize));
pPool->cfg.cb.retireFp(pCollection->collectionId);
atomic_sub_fetch_64(&pCollection->allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size);
@ -811,21 +830,34 @@ void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item,
}
}
void* memPoolMgmtThreadFunc(void* param) {
while (0 == atomic_load_8(&gMPMgmt.modExit)) {
tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs);
(*pPool->cfg.cb.retireFp)(pCollection->collectionId, cAllocSize, atomic_load_64(&pPool->memRetireUnit), true);
taosRLockLatch(&gMPMgmt.poolLock);
int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList);
for (int32_t i = 0; i < poolNum; ++i) {
SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i);
if (pPool->cfg.cb.cfgUpdateFp) {
(*pPool->cfg.cb.cfgUpdateFp)((void*)pPool, &pPool->cfg);
}
void memPoolCheckUpateCfg(void) {
taosRLockLatch(&gMPMgmt.poolLock);
int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList);
for (int32_t i = 0; i < poolNum; ++i) {
SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i);
if (pPool->cfg.cb.cfgUpdateFp) {
(*pPool->cfg.cb.cfgUpdateFp)((void*)pPool, &pPool->cfg);
}
taosRUnLockLatch(&gMPMgmt.poolLock);
}
taosRUnLockLatch(&gMPMgmt.poolLock);
}
void* memPoolMgmtThreadFunc(void* param) {
int32_t timeout = 0;
while (0 == atomic_load_8(&gMPMgmt.modExit)) {
timeout = tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs);
if (0 != timeout) {
memPoolCheckUpateCfg();
continue;
}
if (gMPMgmt.msgQueue.midLevelRetire) {
(*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), false);
} else if (gMPMgmt.msgQueue.lowLevelRetire) {
(*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), true);
}
memPoolCheckUpateCfg();
}
return NULL;