enh: add memory alloc & free stat

This commit is contained in:
dapan1121 2024-11-01 13:35:46 +08:00
parent c1762efbc3
commit 4a4351d4ec
3 changed files with 433 additions and 123 deletions

View File

@ -2209,7 +2209,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
pRsp->numOfRows, pInfo->loadInfo.totalRows);
if (pRsp->numOfRows == 0) {
taosMemoryFree(pRsp);
taosMemFree(pRsp);
return NULL;
}
}
@ -2219,7 +2219,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
taosMemoryFreeClear(pRsp);
taosMemFreeClear(pRsp);
T_LONG_JMP(pTaskInfo->env, code);
}
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
@ -2229,10 +2229,10 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
taosMemoryFreeClear(pRsp);
taosMemFreeClear(pRsp);
T_LONG_JMP(pTaskInfo->env, code);
}
taosMemoryFree(pRsp);
taosMemFree(pRsp);
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
} else if (pOperator->status == OP_EXEC_DONE) {

View File

@ -153,12 +153,41 @@ typedef struct SMPStatSession {
int64_t destroyNum;
} SMPStatSession;
typedef struct SMPAllocStat {
int64_t allocTimes;
int64_t allocBytes;
//int64_t freeIDs[]; // TODO
} SMPAllocStat;
typedef struct SMPFreeStat {
int64_t freeTimes;
int64_t freeBytes;
} SMPFreeStat;
typedef struct SMPFileLineId {
uint32_t fileId;
int32_t line;
} SMPFileLineId;
typedef struct SMPFileLine {
SMPFileLineId fl;
int64_t size;
} SMPFileLine;
typedef struct SMPStatPos {
int64_t logErrTimes;
SHashObj* fileHash; // fileId => fileName
SHashObj* remainHash; // pointer => SMPFileLine
SHashObj* allocHash; // alloc fl => SMPAllocStat
SHashObj* freeHash; // free fl => SMPFreeStat
} SMPStatPos;
typedef struct SMPStatInfo {
SMPStatDetail statDetail;
SMPStatSession statSession;
SHashObj* sessStat;
SHashObj* nodeStat;
SHashObj* posStat;
SMPStatPos posStat;
} SMPStatInfo;
@ -316,6 +345,8 @@ enum {
#define MP_STAT_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail
#define MP_STAT_ORIG_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail, (_item).origExec, (_item).origSucc, (_item).origFail
#define MP_API_ENTER() void* _pPoolHandle = NULL; taosSaveDisableMemoryPoolUsage(_pPoolHandle)
#define MP_API_LEAVE() taosRestoreEnableMemoryPoolUsage(_pPoolHandle)
#define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \
do { \

View File

@ -18,6 +18,7 @@
#include "tmempoolInt.h"
#include "tlog.h"
#include "tutil.h"
#include "taos.h"
static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
threadlocal void* threadPoolHandle = NULL;
@ -178,6 +179,41 @@ int32_t mpUpdateCfg(SMemPool* pPool) {
return TSDB_CODE_SUCCESS;
}
uint32_t mpFileIdHashFp(const char* fileId, uint32_t len) {
return *(uint32_t*)fileId;
}
int32_t mpInitStat(SMPStatPos* pStat, bool sessionStat) {
pStat->remainHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pStat->remainHash) {
uError("memPool init posStat remainHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
return terrno;
}
pStat->allocHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pStat->allocHash) {
uError("memPool init posStat allocHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
return terrno;
}
pStat->freeHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pStat->freeHash) {
uError("memPool init posStat freeHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
return terrno;
}
pStat->fileHash = taosHashInit(1024, mpFileIdHashFp, false, HASH_ENTRY_LOCK);
if (NULL == pStat->fileHash) {
uError("memPool init posStat fileHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat);
return terrno;
}
uDebug("memPool stat initialized, sessionStat:%d", sessionStat);
return TSDB_CODE_SUCCESS;
}
int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(mpCheckCfg(cfg));
@ -202,6 +238,8 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
if (gMPFps[gMPMgmt.strategy].initFp) {
MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg));
}
MP_ERR_RET(mpInitStat(&pPool->stat.posStat, false));
return TSDB_CODE_SUCCESS;
}
@ -437,8 +475,101 @@ void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailN
}
}
void mpPrintFileLineStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) {
//TODO
int32_t mpAddToRemainAllocHash(SHashObj* pHash, SMPFileLine* pFileLine) {
int32_t code = TSDB_CODE_SUCCESS;
SMPAllocStat stat = {0}, *pStat = NULL;
while (true) {
pStat = (SMPAllocStat*)taosHashGet(pHash, &pFileLine->fl, sizeof(pFileLine->fl));
if (NULL == pStat) {
code = taosHashPut(pHash, &pFileLine->fl, sizeof(pFileLine->fl), &stat, sizeof(stat));
if (TSDB_CODE_SUCCESS != code) {
if (TSDB_CODE_DUP_KEY == code) {
continue;
}
uError("taosHashPut to remain alloc hash failed, error:%s", tstrerror(code));
return code;
}
continue;
}
atomic_add_fetch_64(&pStat->allocBytes, pFileLine->size);
atomic_add_fetch_64(&pStat->allocTimes, 1);
break;
}
return TSDB_CODE_SUCCESS;
}
void mpPrintPosRemainStat(SMPStatPos* pStat) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t remainNum = taosHashGetSize(pStat->remainHash);
if (remainNum <= 0) {
uInfo("no alloc remaining memory");
return;
}
SHashObj* pAllocHash = taosHashInit(remainNum / 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pAllocHash) {
uError("taosHashInit pAllocHash failed, error:%s, remainNum:%d", tstrerror(terrno), remainNum);
return;
}
SMPFileLine* pFileLine = NULL;
void* pIter = taosHashIterate(pStat->remainHash, NULL);
while (pIter) {
pFileLine = (SMPFileLine*)pIter;
MP_ERR_JRET(mpAddToRemainAllocHash(pAllocHash, pFileLine));
pIter = taosHashIterate(pStat->remainHash, pIter);
}
SMPAllocStat* pAlloc = NULL;
pIter = taosHashIterate(pAllocHash, NULL);
while (pIter) {
pAlloc = (SMPAllocStat*)pIter;
SMPFileLineId* pId = (SMPFileLineId*)taosHashGetKey(pIter, NULL);
SMPAllocStat* pAlloc = (SMPAllocStat*)taosHashGet(pStat->allocHash, pId, sizeof(*pId));
char* pFileName = (char*)taosHashGet(pStat->fileHash, &pId->fileId, sizeof(pId->fileId));
if (NULL == pAlloc || NULL == pFileName) {
uError("fail to get pId in allocHash or fileHash, pAlloc:%p, pFileName:%p", pAlloc, pFileName);
goto _return;
}
uInfo("REMAINING: %" PRId64 " bytes alloced by %s:%d in %" PRId64 " times", pAlloc->allocBytes, pFileName, pId->line, pAlloc->allocTimes);
pIter = taosHashIterate(pAllocHash, pIter);
}
_return:
taosHashCleanup(pAllocHash);
}
void mpPrintPosAllocStat(SMPStatPos* pStat) {
}
void mpPrintPosFreeStat(SMPStatPos* pStat) {
}
void mpPrintPosStat(SMPCtrlInfo* pCtrl, SMPStatPos* pStat, char* detailName) {
if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
return;
}
uInfo("MemPool [%s] Pos Stat:", detailName);
uInfo("error times: %" PRId64, pStat->logErrTimes);
mpPrintPosRemainStat(pStat);
mpPrintPosAllocStat(pStat);
mpPrintPosFreeStat(pStat);
}
void mpPrintNodeStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) {
@ -578,128 +709,175 @@ void mpLogDetailStat(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput*
}
}
void mpLogPosStat(SHashObj* pHash, EMPStatLogItem item, SMPStatInput* pInput) {
int32_t mpGetAllocFreeStat(SHashObj* pHash, void* pKey, int32_t keyLen, void* pNew, int32_t newSize, void** ppRes) {
void* pStat = NULL;
int32_t code = TSDB_CODE_SUCCESS;
while (true) {
pStat = taosHashGet(pHash, pKey, keyLen);
if (NULL != pStat) {
*ppRes = pStat;
break;
}
code = taosHashPut(pHash, pKey, keyLen, pNew, newSize);
if (code) {
if (TSDB_CODE_DUP_KEY == code) {
continue;
}
return code;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t mpGetPosStatFileId(SMPStatPos* pStat, char* fileName, uint32_t* pId, bool sessionStat) {
uint32_t hashVal = MurmurHash3_32(fileName, strlen(fileName));
int32_t code = taosHashPut(pStat->fileHash, &hashVal, sizeof(hashVal), fileName, strlen(fileName) + 1);
if (code && TSDB_CODE_DUP_KEY != code) {
return code;
}
*pId = hashVal;
return TSDB_CODE_SUCCESS;
}
void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput, bool sessionStat) {
if (!MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
return;
}
int32_t code = TSDB_CODE_SUCCESS;
switch (item) {
case E_MP_STAT_LOG_MEM_MALLOC: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memMalloc.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memMalloc.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memMalloc.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, pInput->size);
}
break;
}
case E_MP_STAT_LOG_MEM_CALLOC:{
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memCalloc.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memCalloc.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memCalloc.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, pInput->size);
}
break;
}
case E_MP_STAT_LOG_MEM_REALLOC:{
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memRealloc.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, pInput->size);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.origExec, pInput->origSize);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memRealloc.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, pInput->size);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.origSucc, pInput->origSize);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memRealloc.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, pInput->size);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.origFail, pInput->origSize);
}
break;
}
case E_MP_STAT_LOG_MEM_FREE:{
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memFree.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memFree.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memFree.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memFree.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memFree.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memFree.fail, pInput->size);
}
break;
}
case E_MP_STAT_LOG_MEM_STRDUP: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.strdup.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.strdup.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.strdup.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.fail, pInput->size);
}
break;
}
case E_MP_STAT_LOG_MEM_MALLOC:
case E_MP_STAT_LOG_MEM_CALLOC:
case E_MP_STAT_LOG_MEM_STRDUP:
case E_MP_STAT_LOG_MEM_STRNDUP: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.strndup.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.strndup.exec, pInput->size);
SMPAllocStat allocStat = {0}, *pAlloc = NULL;
SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size};
code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat);
if (TSDB_CODE_SUCCESS != code) {
uError("add pMem:%p file:%s line:%d to fileHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.strndup.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.strndup.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.strndup.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.strndup.fail, pInput->size);
}
code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine));
if (TSDB_CODE_SUCCESS != code) {
uError("add pMem:%p file:%s line:%d to remainHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc);
if (TSDB_CODE_SUCCESS != code) {
uError("add pMem:%p file:%s line:%d to allocHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
atomic_add_fetch_64(&pAlloc->allocTimes, 1);
atomic_add_fetch_64(&pAlloc->allocBytes, pInput->size);
break;
}
case E_MP_STAT_LOG_MEM_TRIM: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memTrim.exec, 1);
case E_MP_STAT_LOG_MEM_REALLOC: {
SMPAllocStat allocStat = {0}, *pAlloc = NULL;
SMPFreeStat freeStat = {0}, *pFree = NULL;
SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size};
code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pMem:%p file:%s line:%d to fileHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memTrim.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memTrim.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memTrim.fail, 1);
}
ASSERT((pInput->pOrigMem && pInput->origSize > 0) || (NULL == pInput->pOrigMem && pInput->origSize == 0));
if (pInput->pOrigMem && pInput->origSize > 0) {
code = taosHashRemove(pStat->remainHash, &pInput->pOrigMem, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: rm pOrigMem:%p file:%s line:%d from remainHash failed, error:%s, sessionStat:%d",
pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = mpGetAllocFreeStat(pStat->freeHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pOrigMem:%p file:%s line:%d to freeHash failed, error:%s, sessionStat:%d",
pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
atomic_add_fetch_64(&pFree->freeTimes, 1);
atomic_add_fetch_64(&pFree->freeBytes, pInput->origSize);
}
code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine));
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pMem:%p file:%s line:%d to remainHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pMem:%p file:%s line:%d to allocHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
atomic_add_fetch_64(&pAlloc->allocTimes, 1);
atomic_add_fetch_64(&pAlloc->allocBytes, pInput->size);
break;
}
case E_MP_STAT_LOG_MEM_FREE: {
SMPAllocStat allocStat = {0}, *pAlloc = NULL;
SMPFreeStat freeStat = {0}, *pFree = NULL;
SMPFileLineId fl = {.line = pInput->line};
code = mpGetPosStatFileId(pStat, pInput->file, &fl.fileId, sessionStat);
if (TSDB_CODE_SUCCESS != code) {
uError("free: add pMem:%p file:%s line:%d to fileHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = taosHashRemove(pStat->remainHash, &pInput->pMem, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
uError("free: rm pMem:%p file:%s line:%d to remainHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
code = mpGetAllocFreeStat(pStat->freeHash, &fl, sizeof(fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree);
if (TSDB_CODE_SUCCESS != code) {
uError("realloc: add pMem:%p file:%s line:%d to freeHash failed, error:%s, sessionStat:%d",
pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat);
MP_ERR_JRET(code);
}
atomic_add_fetch_64(&pFree->freeTimes, 1);
atomic_add_fetch_64(&pFree->freeBytes, pInput->size);
break;
}
case E_MP_STAT_LOG_MEM_TRIM:
break;
case E_MP_STAT_LOG_CHUNK_MALLOC:
case E_MP_STAT_LOG_CHUNK_RECYCLE:
case E_MP_STAT_LOG_CHUNK_REUSE:
case E_MP_STAT_LOG_CHUNK_FREE: {
break;
}
default:
uError("Invalid stat item: %d", item);
break;
}
return;
_return:
atomic_add_fetch_64(&pStat->logErrTimes, 1);
}
@ -719,7 +897,8 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt
mpLogDetailStat(&pPool->stat.statDetail, item, pInput);
}
if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
mpLogPosStat(pPool->stat.posStat, item, pInput);
mpLogPosStat(&pSession->stat.posStat, item, pInput, true);
mpLogPosStat(&pPool->stat.posStat, item, pInput, false);
}
break;
}
@ -762,13 +941,14 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
if (NULL != poolHandle) {
pCtrl = &pPool->ctrl;
pDetail = &pPool->stat.statDetail;
int64_t sessInit = pPool->stat.statSession.initFail + pPool->stat.statSession.initSucc;
if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == pPool->stat.statSession.destroyNum) {
int64_t sessInit = atomic_load_64(&pPool->stat.statSession.initFail) + atomic_load_64(&pPool->stat.statSession.initSucc);
if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == atomic_load_64(&pPool->stat.statSession.destroyNum)) {
int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ;
int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ;
if (allocSize != freeSize) {
uError("%s MemPool %s stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
ASSERT(0);
} else {
uDebug("%s MemPool %s stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
@ -849,6 +1029,8 @@ _return:
}
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
char detailName[128];
@ -860,7 +1042,7 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionPos");
detailName[sizeof(detailName) - 1] = 0;
mpPrintFileLineStat(&pSession->ctrl, pSession->stat.posStat, detailName);
mpPrintPosStat(&pSession->ctrl, &pSession->stat.posStat, detailName);
}
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
@ -870,15 +1052,19 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
detailName[sizeof(detailName) - 1] = 0;
mpPrintNodeStat(&pSession->ctrl, pSession->stat.nodeStat, detailName);
mpPrintNodeStat(&pPool->ctrl, pPool->stat.nodeStat, detailName);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos");
detailName[sizeof(detailName) - 1] = 0;
mpPrintFileLineStat(&pSession->ctrl, pSession->stat.posStat, detailName);
mpPrintPosStat(&pPool->ctrl, &pPool->stat.posStat, detailName);
MP_API_LEAVE();
}
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = NULL;
@ -918,21 +1104,29 @@ _return:
*poolHandle = pPool;
MP_API_LEAVE();
return code;
}
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
(void)mpUpdateCfg(pPool);
MP_API_LEAVE();
}
void taosMemPoolDestroySession(void* poolHandle, void* session) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
if (NULL == pSession) {
uWarn("null pointer of session");
return;
goto _return;
}
(void)atomic_sub_fetch_32(&pSession->pJob->remainSession, 1);
@ -948,9 +1142,15 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) {
TAOS_MEMSET(pSession, 0, sizeof(*pSession));
mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession);
_return:
MP_API_LEAVE();
}
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = NULL;
@ -962,6 +1162,8 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) {
if (gMPFps[gMPMgmt.strategy].initSessionFp) {
MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession));
}
MP_ERR_JRET(mpInitStat(&pSession->stat.posStat, true));
pSession->pJob = (SMPJob*)pJob;
(void)atomic_add_fetch_32(&pSession->pJob->remainSession, 1);
@ -978,11 +1180,15 @@ _return:
*ppSession = pSession;
MP_API_LEAVE();
return code;
}
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
@ -1001,10 +1207,14 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fil
_return:
MP_API_LEAVE();
return input.pMem;
}
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) {
@ -1025,10 +1235,14 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t
_return:
MP_API_LEAVE();
return input.pMem;
}
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
@ -1064,10 +1278,14 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz
_return:
MP_API_LEAVE();
return input.pMem;
}
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) {
@ -1092,10 +1310,14 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char*
_return:
MP_API_LEAVE();
return input.pMem;
}
char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr || size < 0) {
@ -1121,11 +1343,19 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64
_return:
MP_API_LEAVE();
return input.pMem;
}
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
MP_API_ENTER();
if (NULL == ptr) {
goto _return;
}
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p",
@ -1144,11 +1374,15 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName,
_return:
MP_API_LEAVE();
return;
}
int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
MP_API_ENTER();
int64_t code = 0;
if (NULL == poolHandle || NULL == session || NULL == fileName) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p",
__FUNCTION__, poolHandle, session, fileName);
@ -1156,7 +1390,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha
}
if (NULL == ptr) {
return 0;
goto _return;
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1165,10 +1399,14 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha
_return:
MP_API_LEAVE();
return code;
}
void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) {
@ -1188,10 +1426,14 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment
_return:
MP_API_LEAVE();
return input.pMem;
}
void taosMemPoolClose(void* poolHandle) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
taosMemPoolPrintStat(poolHandle, NULL, "PoolClose");
@ -1200,14 +1442,20 @@ void taosMemPoolClose(void* poolHandle) {
taosMemoryFree(pPool->name);
mpDestroyCacheGroup(&pPool->sessionCache);
MP_API_LEAVE();
}
void taosMemPoolModDestroy(void) {
MP_API_ENTER();
MP_API_LEAVE();
}
int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
@ -1229,27 +1477,38 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil
_return:
MP_API_LEAVE();
return code;
}
int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
*ppJob = taosMemoryCalloc(1, sizeof(SMPJob));
if (NULL == *ppJob) {
uError("calloc mp job failed, code: 0x%x", terrno);
return terrno;
MP_ERR_JRET(terrno);
}
SMPJob* pJob = (SMPJob*)*ppJob;
pJob->job.jobId = jobId;
_return:
MP_API_LEAVE();
return TSDB_CODE_SUCCESS;
return code;
}
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) {
MP_API_ENTER();
if (NULL == poolHandle) {
*usedSize = 0;
*needEnd = false;
return;
goto _return;
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1267,22 +1526,38 @@ void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* need
*needEnd = true;
*usedSize = atomic_load_64(&pPool->allocMemSize);
#endif
_return:
MP_API_LEAVE();
}
void taosMemPoolGetUsedSizeEnd(void* poolHandle) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
taosWUnLockLatch(&pPool->cfgLock);
MP_API_LEAVE();
}
bool taosMemPoolNeedRetireJob(void* poolHandle) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
MP_API_LEAVE();
return atomic_load_64(&pPool->allocMemSize) >= atomic_load_64(&pPool->retireThreshold[0]);
}
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) {
uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize);
MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMPSession* pSession = (SMPSession*)session;
@ -1296,8 +1571,12 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t
if (maxAllocSize) {
*maxAllocSize = atomic_load_64(&pSession->maxAllocMemSize);
}
_return:
MP_API_LEAVE();
return TSDB_CODE_SUCCESS;
return code;
}
void taosAutoMemoryFree(void *ptr) {