enh: add memory pre-check

This commit is contained in:
dapan1121 2024-07-08 19:26:00 +08:00
parent 75d24720e7
commit 30d63415e8
6 changed files with 99 additions and 24 deletions

View File

@ -42,7 +42,7 @@ typedef enum MemPoolUsageLevel {
typedef void (*decConcSessionNum)(void);
typedef void (*incConcSessionNum)(void);
typedef void (*setConcSessionNum)(int32_t);
typedef bool (*retireCollection)(uint64_t, int64_t);
typedef bool (*retireCollection)(uint64_t, int64_t, bool);
typedef struct SMemPoolCallBack {
decConcSessionNum decSessFp;
@ -79,6 +79,13 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollec
void taosMemPoolDestroySession(void* poolHandle, void* session);
int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection);
#define taosMemPoolFreeClear(ptr) \
do { \
if (ptr) { \
taosMemPoolFree((void *)ptr); \
(ptr) = NULL; \
} \
} while (0)
extern threadlocal void* threadPoolHandle;
extern threadlocal void* threadPoolSession;

View File

@ -238,17 +238,27 @@ typedef struct SQWQueryInfo {
SHashObj* pSessions;
} SQWQueryInfo;
typedef struct SQWRetireCtx {
typedef struct SQWRetireLowCtx {
int8_t retireBegin;
} SQWRetireLowCtx;
typedef struct SQWRetireMidCtx {
int8_t retireBegin;
TdThreadCond retired;
BoundedQueue* collectionQueue;
} SQWRetireMidCtx;
typedef struct SQWRetireCtx {
SQWRetireLowCtx lowCtx;
SQWRetireMidCtx midCtx;
} SQWRetireCtx;
typedef struct SQueryMgmt {
SRWLatch taskMgmtLock;
int32_t concTaskLevel;
SHashObj* pQueryInfo;
void* memPoolHandle;
SRWLatch taskMgmtLock;
int32_t concTaskLevel;
SHashObj* pQueryInfo;
void* memPoolHandle;
SQWRetireCtx retireCtx;
} SQueryMgmt;
#define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST)
@ -320,6 +330,18 @@ extern SQueryMgmt gQueryMgmt;
(eId) = *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)); \
} while (0)
#define QW_SET_TEID(id, tId, eId) \
do { \
*(uint64_t *)(id) = (tId); \
*(uint32_t *)((char *)(id) + sizeof(tId)) = (eId); \
} while (0)
#define QW_GET_TEID(id, tId, eId) \
do { \
(tId) = *(uint64_t *)(id); \
(eId) = *(uint32_t *)((char *)(id) + sizeof(tId)); \
} while (0)
#define QW_ERR_RET(c) \
do { \
int32_t _code = (c); \

View File

@ -68,7 +68,7 @@ int32_t qwInitQueryInfo(uint64_t qId, SQWQueryInfo* pQuery) {
return code;
}
int32_t qwInitSession(uint64_t qId, void** ppSession) {
int32_t qwInitSession(QW_FPARAMS_DEF, void** ppSession) {
int32_t code = TSDB_CODE_SUCCESS;
SQWQueryInfo* pQuery = NULL;
@ -95,22 +95,32 @@ int32_t qwInitSession(uint64_t qId, void** ppSession) {
pQuery = (SQWQueryInfo*)taosHashGet(gQueryMgmt.pQueryInfo, &qId, sizeof(qId));
}
code = taosHashPut(pQuery->pSessions, ppSession, POINTER_BYTES, NULL, 0);
if (TSDB_CODE_SUCCESS != code) {
qError("fail to put session into query session hash, errno:%d", terrno);
return terrno;
}
break;
}
QW_ERR_RET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pQuery->pCollection));
char id[sizeof(tId) + sizeof(eId)] = {0};
QW_SET_TEID(id, tId, eId);
code = taosHashPut(pQuery->pSessions, id, sizeof(id), ppSession, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
qError("fail to put session into query session hash, errno:%d", terrno);
return terrno;
}
return code;
}
bool qwRetireCollection(uint64_t collectionId, int64_t retireSize) {
//TODO
bool qwLowLevelRetire() {
}
bool qwRetireCollection(uint64_t collectionId, int64_t retireSize, bool retireLow) {
if (retireLow) {
return qwLowLevelRetire();
}
return false;
}

View File

@ -734,7 +734,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->phase = -1;
if (NULL != gQueryMgmt.memPoolHandle) {
QW_ERR_JRET(qwInitSession(qId, &ctx->memPoolSession));
QW_ERR_JRET(qwInitSession(QW_IDS(), &ctx->memPoolSession));
}
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));

View File

@ -205,7 +205,8 @@ typedef struct SMPSession {
int64_t sessionId;
SMPCollection* pCollection;
void* pCollection;
SMPCollection* pMpCollection;
bool needRetire;
SMPCtrlInfo ctrlInfo;

View File

@ -300,10 +300,10 @@ void memPoolUpdateAllocMemSize(SMemPool* pPool, SMPSession* pSession, int64_t si
int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
memPoolUpdateMaxAllocMemSize(&pSession->maxAllocMemSize, allocMemSize);
allocMemSize = atomic_add_fetch_64(&pSession->pCollection->allocMemSize, size);
memPoolUpdateMaxAllocMemSize(&pSession->pCollection->maxAllocMemSize, allocMemSize);
allocMemSize = atomic_load_64(&pSession->pMpCollection->allocMemSize);
memPoolUpdateMaxAllocMemSize(&pSession->pMpCollection->maxAllocMemSize, allocMemSize);
allocMemSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
allocMemSize = atomic_load_64(&pPool->allocMemSize);
memPoolUpdateMaxAllocMemSize(&pPool->maxAllocMemSize, allocMemSize);
}
@ -382,11 +382,42 @@ _return:
bool memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) {
if (pPool->cfg.collectionQuota > 0 && (atomic_load_64(&pSession->pCollection->allocMemSize) + size) > atomic_load_64(&pPool->cfg.collectionQuota)) {
int64_t allocSize = atomic_add_fetch_64(&pSession->pMpCollection->allocMemSize, size);
int64_t quota = atomic_load_64(&pPool->cfg.collectionQuota);
if (quota > 0 && allocSize > quota) {
uWarn("collection %" PRIx64 " allocSize " PRId64 " is over than quota %" PRId64, pSession->pMpCollection->collectionId, allocSize, quota);
atomic_sub_fetch_64(&pSession->pMpCollection->allocMemSize, size);
return true;
}
if ((atomic_load_64(&pPool->allocMemSize) + size) >= pPool->memRetireThreshold[1]) {
return (*pPool->cfg.cb.retireFp)(pSession->pCollection->collectionId, pPool->memRetireUnit);
allocSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
quota = atomic_load_64(&pPool->memRetireThreshold[2]);
if (allocSize >= quota) {
uWarn("pool allocSize " PRId64 " reaches the high quota %" PRId64, allocSize, quota);
atomic_sub_fetch_64(&pPool->allocMemSize, size);
return true;
}
quota = atomic_load_64(&pPool->memRetireThreshold[1]);
if (allocSize >= quota) {
uInfo("pool allocSize " PRId64 " reaches the middle quota %" PRId64, allocSize, quota);
bool retire = (*pPool->cfg.cb.retireFp)(pSession->pMpCollection->collectionId, pPool->memRetireUnit, false);
if (retire) {
uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize));
atomic_sub_fetch_64(&pPool->allocMemSize, size);
}
return retire;
}
quota = atomic_load_64(&pPool->memRetireThreshold[0]);
if (allocSize >= quota) {
uInfo("pool allocSize " PRId64 " reaches the low quota %" PRId64, allocSize, quota);
bool retire = (*pPool->cfg.cb.retireFp)(pSession->pMpCollection->collectionId, pPool->memRetireUnit, true);
if (retire) {
uWarn("session retired in low quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize));
atomic_sub_fetch_64(&pPool->allocMemSize, size);
}
return retire;
}
return false;
@ -401,6 +432,10 @@ void* memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, ui
void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size);
if (NULL != res) {
memPoolUpdateAllocMemSize(pPool, pSession, size);
} else {
atomic_sub_fetch_64(&pSession->pMpCollection->allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size);
uError("malloc %" PRId64 " alighment %d failed", size, alignment);
}
return res;
@ -842,7 +877,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollec
MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk);
MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk);
pSession->pCollection = (SMPCollection*)pCollection;
pSession->pMpCollection = (SMPCollection*)pCollection;
_return: