enh: add memory pool

This commit is contained in:
dapan1121 2024-07-29 14:57:26 +08:00
parent dfc3fc4987
commit 08ec404200
13 changed files with 92 additions and 76 deletions

View File

@ -74,7 +74,7 @@ extern int32_t tsQueryMaxConcurrentTaskNum;
extern int32_t tsQueryConcurrentTaskNum; extern int32_t tsQueryConcurrentTaskNum;
extern int32_t tsSingleQueryMaxMemorySize; extern int32_t tsSingleQueryMaxMemorySize;
extern int8_t tsQueryUseMemoryPool; extern int8_t tsQueryUseMemoryPool;
extern int32 tsQueryBufferPoolSize; extern int32_t tsQueryBufferPoolSize;
extern int32_t tsNumOfQueryThreads; extern int32_t tsNumOfQueryThreads;
extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions; extern int32_t tsNumOfRpcSessions;

View File

@ -44,7 +44,7 @@ typedef void (*mpIncConcSessionNum)(void);
typedef void (*mpSetConcSessionNum)(int32_t); typedef void (*mpSetConcSessionNum)(int32_t);
typedef void (*mpRetireCollections)(int64_t, bool); typedef void (*mpRetireCollections)(int64_t, bool);
typedef void (*mpRetireCollection)(uint64_t); typedef void (*mpRetireCollection)(uint64_t);
typedef void (*mpCfgUpdate)(void*, SMemPoolCfg*); typedef void (*mpCfgUpdate)(void*, void*);
typedef struct SMemPoolCallBack { typedef struct SMemPoolCallBack {
mpDecConcSessionNum decSessFp; mpDecConcSessionNum decSessFp;
@ -55,6 +55,7 @@ typedef struct SMemPoolCallBack {
mpCfgUpdate cfgUpdateFp; mpCfgUpdate cfgUpdateFp;
} SMemPoolCallBack; } SMemPoolCallBack;
typedef struct SMemPoolCfg { typedef struct SMemPoolCfg {
bool autoMaxSize; bool autoMaxSize;
int64_t maxSize; int64_t maxSize;
@ -67,6 +68,14 @@ typedef struct SMemPoolCfg {
SMemPoolCallBack cb; SMemPoolCallBack cb;
} SMemPoolCfg; } SMemPoolCfg;
typedef struct SMemPoolCollection {
uint64_t collectionId;
int64_t allocMemSize;
int64_t maxAllocMemSize;
} SMemPoolCollection;
void taosMemPoolModInit(void); void taosMemPoolModInit(void);
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle); int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle);
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo); void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo);
@ -83,6 +92,7 @@ void taosAutoMemoryFree(void *ptr);
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollection); int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollection);
void taosMemPoolDestroySession(void* poolHandle, void* session); void taosMemPoolDestroySession(void* poolHandle, void* session);
int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection); int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection);
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg);
#define taosMemPoolFreeClear(ptr) \ #define taosMemPoolFreeClear(ptr) \
do { \ do { \

View File

@ -52,7 +52,7 @@ void *taosMemMallocAlign(uint32_t alignment, int64_t size);
#define TAOS_MEMCPY(_d, _s, _n) ((void)memcpy(_d, _s, _n)) #define TAOS_MEMCPY(_d, _s, _n) ((void)memcpy(_d, _s, _n))
#define TAOS_MEMMOVE(_d, _s, _n) ((void)memmove(_d, _s, _n)) #define TAOS_MEMMOVE(_d, _s, _n) ((void)memmove(_d, _s, _n))
#define taosMemoryFreeClear(ptr) \ #define taosMemFreeClear(ptr) \
do { \ do { \
if (ptr) { \ if (ptr) { \
taosMemFree((void *)ptr); \ taosMemFree((void *)ptr); \

View File

@ -1947,7 +1947,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo->requestId = req.reqId; sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = pParam; sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemoryFree; sendInfo->paramFreeFp = taosMemFree;
sendInfo->fp = tmqPollCb; sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_TMQ_CONSUME; sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
@ -2885,7 +2885,7 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
sendInfo->requestId = generateRequestId(); sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0; sendInfo->requestObjRefId = 0;
sendInfo->param = pParam; sendInfo->param = pParam;
sendInfo->paramFreeFp = taosMemoryFree; sendInfo->paramFreeFp = taosMemFree;
sendInfo->fp = askEpCb; sendInfo->fp = askEpCb;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

View File

@ -55,7 +55,7 @@ int32_t tsShellActivityTimer = 3; // second
// memory pool // memory pool
int8_t tsQueryUseMemoryPool = 1; int8_t tsQueryUseMemoryPool = 1;
int32 tsQueryBufferPoolSize = 0; //MB int32_t tsQueryBufferPoolSize = 0; //MB
int32_t tsSingleQueryMaxMemorySize = 0; //MB int32_t tsSingleQueryMaxMemorySize = 0; //MB
// queue & threads // queue & threads

View File

@ -9504,7 +9504,7 @@ static void tDeleteMqDataRspCommon(void *rsp) {
SMqDataRspCommon *pRsp = rsp; SMqDataRspCommon *pRsp = rsp;
taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL; pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->blockData, NULL);
pRsp->blockData = NULL; pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->blockSchema = NULL; pRsp->blockSchema = NULL;
@ -9558,7 +9558,7 @@ void tDeleteSTaosxRsp(void *rsp) {
STaosxRsp *pRsp = (STaosxRsp *)rsp; STaosxRsp *pRsp = (STaosxRsp *)rsp;
taosArrayDestroy(pRsp->createTableLen); taosArrayDestroy(pRsp->createTableLen);
pRsp->createTableLen = NULL; pRsp->createTableLen = NULL;
taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); taosArrayDestroyP(pRsp->createTableReq, NULL);
pRsp->createTableReq = NULL; pRsp->createTableReq = NULL;
} }

View File

@ -213,7 +213,7 @@ int32_t ipWhiteMgtUpdate(SMnode *pMnode, char *user, SIpWhiteList *pNew) {
_OVER: _OVER:
(void)taosThreadRwlockUnlock(&ipWhiteMgt.rw); (void)taosThreadRwlockUnlock(&ipWhiteMgt.rw);
taosArrayDestroyP(fqdns, (FDelete)taosMemoryFree); taosArrayDestroyP(fqdns, NULL);
if (code < 0) { if (code < 0) {
mError("failed to update ip white list for user: %s at line %d since %s", user, lino, tstrerror(code)); mError("failed to update ip white list for user: %s at line %d since %s", user, lino, tstrerror(code));
} }
@ -633,8 +633,8 @@ int32_t mndFetchAllIpWhite(SMnode *pMnode, SHashObj **ppIpWhiteTab) {
} }
_OVER: _OVER:
taosArrayDestroyP(fqdns, taosMemoryFree); taosArrayDestroyP(fqdns, NULL);
taosArrayDestroyP(pUserNames, taosMemoryFree); taosArrayDestroyP(pUserNames, NULL);
if (code < 0) { if (code < 0) {
mError("failed to fetch all ip white list at line %d since %s", lino, tstrerror(code)); mError("failed to fetch all ip white list at line %d since %s", lino, tstrerror(code));

View File

@ -233,8 +233,8 @@ typedef struct SQWorkerMgmt {
} SQWorkerMgmt; } SQWorkerMgmt;
typedef struct SQWQueryInfo { typedef struct SQWQueryInfo {
bool retired; int8_t retired;
void* pCollection; SMemPoolCollection* pCollection;
SHashObj* pSessions; SHashObj* pSessions;
} SQWQueryInfo; } SQWQueryInfo;
@ -483,7 +483,7 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
int32_t qwInitQueryPool(void); int32_t qwInitQueryPool(void);
void qwDestroyQueryInfo(SQWQueryInfo* pQuery); void qwDestroyQueryInfo(SQWQueryInfo* pQuery);
int32_t qwInitSession(uint64_t qId, void** ppSession); int32_t qwInitSession(QW_FPARAMS_DEF, void** ppSession);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -60,7 +60,7 @@ int32_t qwInitQueryInfo(uint64_t qId, SQWQueryInfo* pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
int32_t code = taosMemPoolCallocCollection(qId, &pQuery->pCollection); int32_t code = taosMemPoolCallocCollection(qId, (void**)&pQuery->pCollection);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pQuery->pSessions); taosHashCleanup(pQuery->pSessions);
return code; return code;
@ -118,17 +118,22 @@ void qwRetireCollectionCb(uint64_t collectionId) {
} }
void qwLowLevelRetire(int64_t retireSize) { void qwLowLevelRetire(int64_t retireSize) {
void* pIter = taosHashIterate(gQueryMgmt.pQueryInfo, NULL); SQWQueryInfo* pQuery = (SQWQueryInfo*)taosHashIterate(gQueryMgmt.pQueryInfo, NULL);
while (pIter) { while (pQuery) {
vgInfo = pIter; int64_t aSize = atomic_load_64(&pQuery->pCollection->allocMemSize);
if (aSize >= retireSize) {
atomic_store_8(&pQuery->retired, 1);
pInfo->vgHash[i].vgId = vgInfo->vgId; //TODO RETIRE JOB/TASKS DIRECTLY
pInfo->vgHash[i].hashBegin = vgInfo->hashBegin;
pInfo->vgHash[i].hashEnd = vgInfo->hashEnd;
pIter = taosHashIterate(gQueryMgmt.pQueryInfo, pIter); qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
vgInfo = NULL; pQuery->pCollection->collectionId, aSize, retireSize);
++i;
taosHashCancelIterate(gQueryMgmt.pQueryInfo, pQuery);
break;
}
pQuery = (SQWQueryInfo*)taosHashIterate(gQueryMgmt.pQueryInfo, pQuery);
} }
} }
@ -170,7 +175,8 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
return code; return code;
} }
void qwCheckUpateCfgCb(void* pHandle, SMemPoolCfg* pCfg) { void qwCheckUpateCfgCb(void* pHandle, void* cfg) {
SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg;
int64_t newCollectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; int64_t newCollectionQuota = tsSingleQueryMaxMemorySize * 1048576UL;
if (pCfg->collectionQuota != newCollectionQuota) { if (pCfg->collectionQuota != newCollectionQuota) {
atomic_store_64(&pCfg->collectionQuota, newCollectionQuota); atomic_store_64(&pCfg->collectionQuota, newCollectionQuota);

View File

@ -759,7 +759,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->phase = -1; ctx->phase = -1;
if (NULL != gQueryMgmt.memPoolHandle) { if (NULL != gQueryMgmt.memPoolHandle) {
QW_ERR_JRET(qwInitSession(QW_IDS(), &ctx->memPoolSession)); QW_ERR_JRET(qwInitSession(QW_FPARAMS(), &ctx->memPoolSession));
} }
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));

View File

@ -757,11 +757,11 @@ int32_t taosGetSysAvailMemory(int64_t *availSize) {
#else #else
TdFilePtr pFile = taosOpenFile("/proc/meminfo", TD_FILE_READ | TD_FILE_STREAM); TdFilePtr pFile = taosOpenFile("/proc/meminfo", TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) { if (pFile == NULL) {
return -1; return terrno;
} }
ssize_t bytes = 0; ssize_t bytes = 0;
char line[1024] = {0}; char line[128] = {0};
int32_t expectedSize = 13; //"MemAvailable:" int32_t expectedSize = 13; //"MemAvailable:"
while (!taosEOFFile(pFile)) { while (!taosEOFFile(pFile)) {
bytes = taosGetsFile(pFile, sizeof(line), line); bytes = taosGetsFile(pFile, sizeof(line), line);
@ -769,8 +769,8 @@ int32_t taosGetSysAvailMemory(int64_t *availSize) {
break; break;
} }
if (line[0] != 'M' && line[3] != 'A') { if (line[0] != 'M' && line[3] != 'A') {
continue;
line[0] = 0; line[0] = 0;
continue;
} }
if (0 == strncmp(line, "MemAvailable:", expectedSize)) { if (0 == strncmp(line, "MemAvailable:", expectedSize)) {
break; break;
@ -778,15 +778,15 @@ int32_t taosGetSysAvailMemory(int64_t *availSize) {
} }
if (0 == line[0]) { if (0 == line[0]) {
return -1; return TSDB_CODE_SYSTEM_ERROR;
} }
char tmp[32]; char tmp[32];
sscanf(line, "%s %" PRId64, tmp, availSize); (void)sscanf(line, "%s %" PRId64, tmp, availSize);
*availSize *= 1024; *availSize *= 1024;
taosCloseFile(&pFile); (void)taosCloseFile(&pFile);
return 0; return 0;
#endif #endif
} }

View File

@ -194,10 +194,7 @@ typedef struct SMPStatInfo {
typedef struct SMPCollection { typedef struct SMPCollection {
uint64_t collectionId; SMemPoolCollection collection; // KEEP IT FIRST
int64_t allocMemSize;
int64_t maxAllocMemSize;
SMPStatInfo stat; SMPStatInfo stat;
} SMPCollection; } SMPCollection;
@ -292,8 +289,8 @@ typedef enum EMPMemStrategy {
typedef struct SMPMsgQueue { typedef struct SMPMsgQueue {
SMemPool* pPool; SMemPool* pPool;
bool lowLevelRetire; int8_t lowLevelRetire;
bool midLevelRetire; int8_t midLevelRetire;
} SMPMsgQueue; } SMPMsgQueue;
typedef struct SMemPoolMgmt { typedef struct SMemPoolMgmt {

View File

@ -312,15 +312,15 @@ void memPoolUpdateAllocMemSize(SMemPool* pPool, SMPSession* pSession, int64_t si
int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size); int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
memPoolUpdateMaxAllocMemSize(&pSession->maxAllocMemSize, allocMemSize); memPoolUpdateMaxAllocMemSize(&pSession->maxAllocMemSize, allocMemSize);
allocMemSize = atomic_load_64(&pSession->pMpCollection->allocMemSize); allocMemSize = atomic_load_64(&pSession->pMpCollection->collection.allocMemSize);
memPoolUpdateMaxAllocMemSize(&pSession->pMpCollection->maxAllocMemSize, allocMemSize); memPoolUpdateMaxAllocMemSize(&pSession->pMpCollection->collection.maxAllocMemSize, allocMemSize);
allocMemSize = atomic_load_64(&pPool->allocMemSize); allocMemSize = atomic_load_64(&pPool->allocMemSize);
memPoolUpdateMaxAllocMemSize(&pPool->maxAllocMemSize, allocMemSize); memPoolUpdateMaxAllocMemSize(&pPool->maxAllocMemSize, allocMemSize);
} }
void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { int32_t memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SMPChunk* pChunk = NULL, *preSrcChunk = NULL; SMPChunk* pChunk = NULL, *preSrcChunk = NULL;
void* pRes = NULL; void* pRes = NULL;
@ -361,10 +361,12 @@ void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size,
_return: _return:
return pRes; *ppRes = pRes;
return code;
} }
void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { int32_t memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SMPNSChunk* pChunk = NULL; SMPNSChunk* pChunk = NULL;
void* pRes = NULL; void* pRes = NULL;
@ -389,7 +391,9 @@ void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t siz
_return: _return:
return pRes; *ppRes = pRes;
return code;
} }
int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
@ -413,34 +417,34 @@ int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) {
SMPCollection* pCollection = pSession->pMpCollection; SMPCollection* pCollection = pSession->pMpCollection;
int64_t cAllocSize = atomic_add_fetch_64(&pCollection->allocMemSize, size); int64_t cAllocSize = atomic_add_fetch_64(&pCollection->collection.allocMemSize, size);
int64_t quota = atomic_load_64(&pPool->cfg.collectionQuota); int64_t quota = atomic_load_64(&pPool->cfg.collectionQuota);
if (quota > 0 && cAllocSize > quota) { if (quota > 0 && cAllocSize > quota) {
uWarn("collection %" PRIx64 " allocSize " PRId64 " is over than quota %" PRId64, pCollection->collectionId, cAllocSize, quota); uWarn("collection %" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pCollection->collection.collectionId, cAllocSize, quota);
pPool->cfg.cb.retireFp(pCollection->collectionId); pPool->cfg.cb.retireFp(pCollection->collection.collectionId);
atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size);
MP_RET(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); MP_RET(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD);
} }
int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size); int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
quota = atomic_load_64(&pPool->memRetireThreshold[2]); quota = atomic_load_64(&pPool->memRetireThreshold[2]);
if (pAllocSize >= quota) { if (pAllocSize >= quota) {
uWarn("pool allocSize " PRId64 " reaches the high quota %" PRId64, pAllocSize, quota); uWarn("pool allocSize %" PRId64 " reaches the high quota %" PRId64, pAllocSize, quota);
pPool->cfg.cb.retireFp(pCollection->collectionId); pPool->cfg.cb.retireFp(pCollection->collection.collectionId);
atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
} }
quota = atomic_load_64(&pPool->memRetireThreshold[1]); quota = atomic_load_64(&pPool->memRetireThreshold[1]);
if (pAllocSize >= quota) { if (pAllocSize >= quota) {
uInfo("pool allocSize " PRId64 " reaches the middle quota %" PRId64, pAllocSize, quota); uInfo("pool allocSize %" PRId64 " reaches the middle quota %" PRId64, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit) / 2) { if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit) / 2) {
uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64 ", collectionSize: %" PRId64, uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64 ", collectionSize: %" PRId64,
atomic_load_64(&pSession->allocMemSize), cAllocSize); atomic_load_64(&pSession->allocMemSize), cAllocSize);
pPool->cfg.cb.retireFp(pCollection->collectionId); pPool->cfg.cb.retireFp(pCollection->collection.collectionId);
atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, false)); MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, false));
@ -452,12 +456,12 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s
quota = atomic_load_64(&pPool->memRetireThreshold[0]); quota = atomic_load_64(&pPool->memRetireThreshold[0]);
if (pAllocSize >= quota) { if (pAllocSize >= quota) {
uInfo("pool allocSize " PRId64 " reaches the low quota %" PRId64, pAllocSize, quota); uInfo("pool allocSize %" PRId64 " reaches the low quota %" PRId64, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) { if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) {
uWarn("session retired in low quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize)); uWarn("session retired in low quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize));
pPool->cfg.cb.retireFp(pCollection->collectionId); pPool->cfg.cb.retireFp(pCollection->collection.collectionId);
atomic_sub_fetch_64(&pCollection->allocMemSize, size); atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, true)); MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, true));
@ -476,7 +480,7 @@ int32_t memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size,
if (NULL != res) { if (NULL != res) {
memPoolUpdateAllocMemSize(pPool, pSession, size); memPoolUpdateAllocMemSize(pPool, pSession, size);
} else { } else {
atomic_sub_fetch_64(&pSession->pMpCollection->allocMemSize, size); atomic_sub_fetch_64(&pSession->pMpCollection->collection.allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size);
uError("malloc %" PRId64 " alighment %d failed", size, alignment); uError("malloc %" PRId64 " alighment %d failed", size, alignment);
@ -518,10 +522,12 @@ int32_t memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, u
return code; return code;
} }
void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size) { int32_t memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int64_t totalSize = num * size; int64_t totalSize = num * size;
void *res = memPoolMallocImpl(pPool, pSession, totalSize, 0); void *res = NULL;
MP_ERR_RET(memPoolMallocImpl(pPool, pSession, totalSize, 0, &res));
if (NULL != res) { if (NULL != res) {
memset(res, 0, totalSize); memset(res, 0, totalSize);
@ -529,7 +535,9 @@ void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int6
_return: _return:
return res; *ppRes = res;
return code;
} }
@ -571,7 +579,7 @@ int32_t memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void **pPtr, i
if (NULL == *pPtr) { if (NULL == *pPtr) {
*origSize = 0; *origSize = 0;
MP_RET(memPoolMallocImpl(pPool, pSession, size, 0)); MP_RET(memPoolMallocImpl(pPool, pSession, size, 0, pPtr));
} }
if (0 == size) { if (0 == size) {
@ -851,9 +859,9 @@ void* memPoolMgmtThreadFunc(void* param) {
continue; continue;
} }
if (gMPMgmt.msgQueue.midLevelRetire) { if (atomic_load_8(&gMPMgmt.msgQueue.midLevelRetire)) {
(*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), false); (*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), false);
} else if (gMPMgmt.msgQueue.lowLevelRetire) { } else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) {
(*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), true); (*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), true);
} }
@ -876,8 +884,7 @@ void taosMemPoolModInit(void) {
gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0); gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0);
if (TSDB_CODE_SUCCESS != gMPMgmt.code) { if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
gMPMgmt.code = TAOS_SYSTEM_ERROR(errno); uError("failed to init sem2, error: %x", gMPMgmt.code);
qError("failed to init sem2, error: %x", gMPMgmt.code);
return; return;
} }
@ -935,7 +942,7 @@ _return:
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) { void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) {
SMemPool* pPool = (SMemPool*)poolHandle; SMemPool* pPool = (SMemPool*)poolHandle;
MP_ERR_RET(memPoolApplyCfgUpdate(pPool)); (void)memPoolApplyCfgUpdate(pPool);
} }
void taosMemPoolDestroySession(void* poolHandle, void* session) { void taosMemPoolDestroySession(void* poolHandle, void* session) {
@ -1022,11 +1029,7 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t
int64_t totalSize = num * size; int64_t totalSize = num * size;
SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
res = memPoolMallocImpl(pPool, pSession, totalSize, 0); MP_ERR_JRET(memPoolCallocImpl(pPool, pSession, num, size, &res));
if (NULL != res) {
memset(res, 0, totalSize);
}
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input); memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input);
@ -1075,7 +1078,7 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char
int64_t size = strlen(ptr) + 1; int64_t size = strlen(ptr) + 1;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
res = memPoolMallocImpl(pPool, pSession, size, 0); MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, 0, &res));
if (NULL != res) { if (NULL != res) {
strcpy(res, ptr); strcpy(res, ptr);
} }
@ -1145,7 +1148,7 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment
SMPSession* pSession = (SMPSession*)session; SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
res = memPoolMallocImpl(pPool, pSession, size, alignment); MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, alignment, &res));
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
@ -1183,7 +1186,7 @@ int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection)
} }
SMPCollection* pCollection = (SMPCollection*)*ppCollection; SMPCollection* pCollection = (SMPCollection*)*ppCollection;
pCollection->collectionId = collectionId; pCollection->collection.collectionId = collectionId;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }