diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index b6e4018e46..56934afc82 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2209,7 +2209,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca pRsp->numOfRows, pInfo->loadInfo.totalRows); if (pRsp->numOfRows == 0) { - taosMemoryFree(pRsp); + taosMemFree(pRsp); return NULL; } } @@ -2219,7 +2219,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); pTaskInfo->code = code; - taosMemoryFreeClear(pRsp); + taosMemFreeClear(pRsp); T_LONG_JMP(pTaskInfo->env, code); } updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); @@ -2229,10 +2229,10 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); pTaskInfo->code = code; - taosMemoryFreeClear(pRsp); + taosMemFreeClear(pRsp); T_LONG_JMP(pTaskInfo->env, code); } - taosMemoryFree(pRsp); + taosMemFree(pRsp); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } else if (pOperator->status == OP_EXEC_DONE) { diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 4dd6f67ff6..888b9cca98 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -153,12 +153,41 @@ typedef struct SMPStatSession { int64_t destroyNum; } SMPStatSession; +typedef struct SMPAllocStat { + int64_t allocTimes; + int64_t allocBytes; + //int64_t freeIDs[]; // TODO +} SMPAllocStat; + +typedef struct SMPFreeStat { + int64_t freeTimes; + int64_t freeBytes; +} SMPFreeStat; + +typedef struct SMPFileLineId { + uint32_t fileId; + int32_t line; +} SMPFileLineId; + +typedef struct SMPFileLine { + SMPFileLineId fl; + int64_t size; +} SMPFileLine; + +typedef struct SMPStatPos { + int64_t logErrTimes; + SHashObj* fileHash; // fileId => fileName + SHashObj* remainHash; // pointer => SMPFileLine + SHashObj* allocHash; // alloc fl => SMPAllocStat + SHashObj* freeHash; // free fl => SMPFreeStat +} SMPStatPos; + typedef struct SMPStatInfo { SMPStatDetail statDetail; SMPStatSession statSession; SHashObj* sessStat; SHashObj* nodeStat; - SHashObj* posStat; + SMPStatPos posStat; } SMPStatInfo; @@ -316,6 +345,8 @@ enum { #define MP_STAT_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail #define MP_STAT_ORIG_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail, (_item).origExec, (_item).origSucc, (_item).origFail +#define MP_API_ENTER() void* _pPoolHandle = NULL; taosSaveDisableMemoryPoolUsage(_pPoolHandle) +#define MP_API_LEAVE() taosRestoreEnableMemoryPoolUsage(_pPoolHandle) #define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \ do { \ diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 1f7e1508de..0199e2c2b8 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -18,6 +18,7 @@ #include "tmempoolInt.h" #include "tlog.h" #include "tutil.h" +#include "taos.h" static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; threadlocal void* threadPoolHandle = NULL; @@ -178,6 +179,41 @@ int32_t mpUpdateCfg(SMemPool* pPool) { return TSDB_CODE_SUCCESS; } +uint32_t mpFileIdHashFp(const char* fileId, uint32_t len) { + return *(uint32_t*)fileId; +} + + +int32_t mpInitStat(SMPStatPos* pStat, bool sessionStat) { + pStat->remainHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pStat->remainHash) { + uError("memPool init posStat remainHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat); + return terrno; + } + + pStat->allocHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pStat->allocHash) { + uError("memPool init posStat allocHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat); + return terrno; + } + + pStat->freeHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pStat->freeHash) { + uError("memPool init posStat freeHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat); + return terrno; + } + + pStat->fileHash = taosHashInit(1024, mpFileIdHashFp, false, HASH_ENTRY_LOCK); + if (NULL == pStat->fileHash) { + uError("memPool init posStat fileHash failed, error:%s, sessionStat:%d", tstrerror(terrno), sessionStat); + return terrno; + } + + uDebug("memPool stat initialized, sessionStat:%d", sessionStat); + + return TSDB_CODE_SUCCESS; +} + int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET(mpCheckCfg(cfg)); @@ -202,6 +238,8 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { if (gMPFps[gMPMgmt.strategy].initFp) { MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg)); } + + MP_ERR_RET(mpInitStat(&pPool->stat.posStat, false)); return TSDB_CODE_SUCCESS; } @@ -437,8 +475,101 @@ void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailN } } -void mpPrintFileLineStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) { - //TODO +int32_t mpAddToRemainAllocHash(SHashObj* pHash, SMPFileLine* pFileLine) { + int32_t code = TSDB_CODE_SUCCESS; + SMPAllocStat stat = {0}, *pStat = NULL; + + while (true) { + pStat = (SMPAllocStat*)taosHashGet(pHash, &pFileLine->fl, sizeof(pFileLine->fl)); + if (NULL == pStat) { + code = taosHashPut(pHash, &pFileLine->fl, sizeof(pFileLine->fl), &stat, sizeof(stat)); + if (TSDB_CODE_SUCCESS != code) { + if (TSDB_CODE_DUP_KEY == code) { + continue; + } + + uError("taosHashPut to remain alloc hash failed, error:%s", tstrerror(code)); + return code; + } + + continue; + } + + atomic_add_fetch_64(&pStat->allocBytes, pFileLine->size); + atomic_add_fetch_64(&pStat->allocTimes, 1); + break; + } + + return TSDB_CODE_SUCCESS; +} + +void mpPrintPosRemainStat(SMPStatPos* pStat) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t remainNum = taosHashGetSize(pStat->remainHash); + if (remainNum <= 0) { + uInfo("no alloc remaining memory"); + return; + } + + SHashObj* pAllocHash = taosHashInit(remainNum / 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (NULL == pAllocHash) { + uError("taosHashInit pAllocHash failed, error:%s, remainNum:%d", tstrerror(terrno), remainNum); + return; + } + + SMPFileLine* pFileLine = NULL; + void* pIter = taosHashIterate(pStat->remainHash, NULL); + while (pIter) { + pFileLine = (SMPFileLine*)pIter; + + MP_ERR_JRET(mpAddToRemainAllocHash(pAllocHash, pFileLine)); + + pIter = taosHashIterate(pStat->remainHash, pIter); + } + + SMPAllocStat* pAlloc = NULL; + pIter = taosHashIterate(pAllocHash, NULL); + while (pIter) { + pAlloc = (SMPAllocStat*)pIter; + SMPFileLineId* pId = (SMPFileLineId*)taosHashGetKey(pIter, NULL); + SMPAllocStat* pAlloc = (SMPAllocStat*)taosHashGet(pStat->allocHash, pId, sizeof(*pId)); + char* pFileName = (char*)taosHashGet(pStat->fileHash, &pId->fileId, sizeof(pId->fileId)); + if (NULL == pAlloc || NULL == pFileName) { + uError("fail to get pId in allocHash or fileHash, pAlloc:%p, pFileName:%p", pAlloc, pFileName); + goto _return; + } + + uInfo("REMAINING: %" PRId64 " bytes alloced by %s:%d in %" PRId64 " times", pAlloc->allocBytes, pFileName, pId->line, pAlloc->allocTimes); + + pIter = taosHashIterate(pAllocHash, pIter); + } + +_return: + + taosHashCleanup(pAllocHash); +} + +void mpPrintPosAllocStat(SMPStatPos* pStat) { + +} + +void mpPrintPosFreeStat(SMPStatPos* pStat) { + +} + +void mpPrintPosStat(SMPCtrlInfo* pCtrl, SMPStatPos* pStat, char* detailName) { + if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) { + return; + } + + uInfo("MemPool [%s] Pos Stat:", detailName); + uInfo("error times: %" PRId64, pStat->logErrTimes); + + mpPrintPosRemainStat(pStat); + + mpPrintPosAllocStat(pStat); + + mpPrintPosFreeStat(pStat); } void mpPrintNodeStat(SMPCtrlInfo* pCtrl, SHashObj* pHash, char* detailName) { @@ -578,128 +709,175 @@ void mpLogDetailStat(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* } } -void mpLogPosStat(SHashObj* pHash, EMPStatLogItem item, SMPStatInput* pInput) { +int32_t mpGetAllocFreeStat(SHashObj* pHash, void* pKey, int32_t keyLen, void* pNew, int32_t newSize, void** ppRes) { + void* pStat = NULL; + int32_t code = TSDB_CODE_SUCCESS; + + while (true) { + pStat = taosHashGet(pHash, pKey, keyLen); + if (NULL != pStat) { + *ppRes = pStat; + break; + } + + code = taosHashPut(pHash, pKey, keyLen, pNew, newSize); + if (code) { + if (TSDB_CODE_DUP_KEY == code) { + continue; + } + + return code; + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t mpGetPosStatFileId(SMPStatPos* pStat, char* fileName, uint32_t* pId, bool sessionStat) { + uint32_t hashVal = MurmurHash3_32(fileName, strlen(fileName)); + int32_t code = taosHashPut(pStat->fileHash, &hashVal, sizeof(hashVal), fileName, strlen(fileName) + 1); + if (code && TSDB_CODE_DUP_KEY != code) { + return code; + } + + *pId = hashVal; + + return TSDB_CODE_SUCCESS; +} + +void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput, bool sessionStat) { if (!MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { return; } + + int32_t code = TSDB_CODE_SUCCESS; switch (item) { - case E_MP_STAT_LOG_MEM_MALLOC: { - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { - atomic_add_fetch_64(&pDetail->times.memMalloc.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { - atomic_add_fetch_64(&pDetail->times.memMalloc.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { - atomic_add_fetch_64(&pDetail->times.memMalloc.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, pInput->size); - } - break; - } - case E_MP_STAT_LOG_MEM_CALLOC:{ - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { - atomic_add_fetch_64(&pDetail->times.memCalloc.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { - atomic_add_fetch_64(&pDetail->times.memCalloc.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { - atomic_add_fetch_64(&pDetail->times.memCalloc.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, pInput->size); - } - break; - } - case E_MP_STAT_LOG_MEM_REALLOC:{ - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { - atomic_add_fetch_64(&pDetail->times.memRealloc.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, pInput->size); - atomic_add_fetch_64(&pDetail->bytes.memRealloc.origExec, pInput->origSize); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { - atomic_add_fetch_64(&pDetail->times.memRealloc.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, pInput->size); - atomic_add_fetch_64(&pDetail->bytes.memRealloc.origSucc, pInput->origSize); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { - atomic_add_fetch_64(&pDetail->times.memRealloc.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, pInput->size); - atomic_add_fetch_64(&pDetail->bytes.memRealloc.origFail, pInput->origSize); - } - break; - } - case E_MP_STAT_LOG_MEM_FREE:{ - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { - atomic_add_fetch_64(&pDetail->times.memFree.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.memFree.exec, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { - atomic_add_fetch_64(&pDetail->times.memFree.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.memFree.succ, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { - atomic_add_fetch_64(&pDetail->times.memFree.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.memFree.fail, pInput->size); - } - break; - } - case E_MP_STAT_LOG_MEM_STRDUP: { - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { - atomic_add_fetch_64(&pDetail->times.strdup.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.strdup.exec, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { - atomic_add_fetch_64(&pDetail->times.strdup.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.strdup.succ, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { - atomic_add_fetch_64(&pDetail->times.strdup.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.strdup.fail, pInput->size); - } - break; - } + case E_MP_STAT_LOG_MEM_MALLOC: + case E_MP_STAT_LOG_MEM_CALLOC: + case E_MP_STAT_LOG_MEM_STRDUP: case E_MP_STAT_LOG_MEM_STRNDUP: { - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { - atomic_add_fetch_64(&pDetail->times.strndup.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.strndup.exec, pInput->size); + SMPAllocStat allocStat = {0}, *pAlloc = NULL; + SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size}; + code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat); + if (TSDB_CODE_SUCCESS != code) { + uError("add pMem:%p file:%s line:%d to fileHash failed, error:%s, sessionStat:%d", + pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { - atomic_add_fetch_64(&pDetail->times.strndup.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.strndup.succ, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { - atomic_add_fetch_64(&pDetail->times.strndup.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.strndup.fail, pInput->size); - } + code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine)); + if (TSDB_CODE_SUCCESS != code) { + uError("add pMem:%p file:%s line:%d to remainHash failed, error:%s, sessionStat:%d", + pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); + } + code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc); + if (TSDB_CODE_SUCCESS != code) { + uError("add pMem:%p file:%s line:%d to allocHash failed, error:%s, sessionStat:%d", + pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); + } + + atomic_add_fetch_64(&pAlloc->allocTimes, 1); + atomic_add_fetch_64(&pAlloc->allocBytes, pInput->size); break; } - case E_MP_STAT_LOG_MEM_TRIM: { - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { - atomic_add_fetch_64(&pDetail->times.memTrim.exec, 1); + case E_MP_STAT_LOG_MEM_REALLOC: { + SMPAllocStat allocStat = {0}, *pAlloc = NULL; + SMPFreeStat freeStat = {0}, *pFree = NULL; + SMPFileLine fileLine = {.fl.line = pInput->line, .size = pInput->size}; + code = mpGetPosStatFileId(pStat, pInput->file, &fileLine.fl.fileId, sessionStat); + if (TSDB_CODE_SUCCESS != code) { + uError("realloc: add pMem:%p file:%s line:%d to fileHash failed, error:%s, sessionStat:%d", + pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { - atomic_add_fetch_64(&pDetail->times.memTrim.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.memTrim.succ, pInput->size); - } - if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { - atomic_add_fetch_64(&pDetail->times.memTrim.fail, 1); - } + + ASSERT((pInput->pOrigMem && pInput->origSize > 0) || (NULL == pInput->pOrigMem && pInput->origSize == 0)); + + if (pInput->pOrigMem && pInput->origSize > 0) { + code = taosHashRemove(pStat->remainHash, &pInput->pOrigMem, POINTER_BYTES); + if (TSDB_CODE_SUCCESS != code) { + uError("realloc: rm pOrigMem:%p file:%s line:%d from remainHash failed, error:%s, sessionStat:%d", + pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); + } + code = mpGetAllocFreeStat(pStat->freeHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree); + if (TSDB_CODE_SUCCESS != code) { + uError("realloc: add pOrigMem:%p file:%s line:%d to freeHash failed, error:%s, sessionStat:%d", + pInput->pOrigMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); + } + + atomic_add_fetch_64(&pFree->freeTimes, 1); + atomic_add_fetch_64(&pFree->freeBytes, pInput->origSize); + } + + code = taosHashPut(pStat->remainHash, &pInput->pMem, POINTER_BYTES, &fileLine, sizeof(fileLine)); + if (TSDB_CODE_SUCCESS != code) { + uError("realloc: add pMem:%p file:%s line:%d to remainHash failed, error:%s, sessionStat:%d", + pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); + } + + code = mpGetAllocFreeStat(pStat->allocHash, &fileLine.fl, sizeof(fileLine.fl), (void*)&allocStat, sizeof(allocStat), (void**)&pAlloc); + if (TSDB_CODE_SUCCESS != code) { + uError("realloc: add pMem:%p file:%s line:%d to allocHash failed, error:%s, sessionStat:%d", + pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); + } + + atomic_add_fetch_64(&pAlloc->allocTimes, 1); + atomic_add_fetch_64(&pAlloc->allocBytes, pInput->size); break; } + case E_MP_STAT_LOG_MEM_FREE: { + SMPAllocStat allocStat = {0}, *pAlloc = NULL; + SMPFreeStat freeStat = {0}, *pFree = NULL; + SMPFileLineId fl = {.line = pInput->line}; + code = mpGetPosStatFileId(pStat, pInput->file, &fl.fileId, sessionStat); + if (TSDB_CODE_SUCCESS != code) { + uError("free: add pMem:%p file:%s line:%d to fileHash failed, error:%s, sessionStat:%d", + pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); + } + + code = taosHashRemove(pStat->remainHash, &pInput->pMem, POINTER_BYTES); + if (TSDB_CODE_SUCCESS != code) { + uError("free: rm pMem:%p file:%s line:%d to remainHash failed, error:%s, sessionStat:%d", + pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); + } + + code = mpGetAllocFreeStat(pStat->freeHash, &fl, sizeof(fl), (void*)&freeStat, sizeof(freeStat), (void**)&pFree); + if (TSDB_CODE_SUCCESS != code) { + uError("realloc: add pMem:%p file:%s line:%d to freeHash failed, error:%s, sessionStat:%d", + pInput->pMem, pInput->file, pInput->line, tstrerror(code), sessionStat); + MP_ERR_JRET(code); + } + + atomic_add_fetch_64(&pFree->freeTimes, 1); + atomic_add_fetch_64(&pFree->freeBytes, pInput->size); + break; + } + case E_MP_STAT_LOG_MEM_TRIM: + break; case E_MP_STAT_LOG_CHUNK_MALLOC: case E_MP_STAT_LOG_CHUNK_RECYCLE: case E_MP_STAT_LOG_CHUNK_REUSE: case E_MP_STAT_LOG_CHUNK_FREE: { - + break; } default: uError("Invalid stat item: %d", item); break; } + + return; + +_return: + + atomic_add_fetch_64(&pStat->logErrTimes, 1); } @@ -719,7 +897,8 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt mpLogDetailStat(&pPool->stat.statDetail, item, pInput); } if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { - mpLogPosStat(pPool->stat.posStat, item, pInput); + mpLogPosStat(&pSession->stat.posStat, item, pInput, true); + mpLogPosStat(&pPool->stat.posStat, item, pInput, false); } break; } @@ -762,13 +941,14 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) { if (NULL != poolHandle) { pCtrl = &pPool->ctrl; pDetail = &pPool->stat.statDetail; - int64_t sessInit = pPool->stat.statSession.initFail + pPool->stat.statSession.initSucc; - if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == pPool->stat.statSession.destroyNum) { + int64_t sessInit = atomic_load_64(&pPool->stat.statSession.initFail) + atomic_load_64(&pPool->stat.statSession.initSucc); + if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == atomic_load_64(&pPool->stat.statSession.destroyNum)) { int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ; int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ; if (allocSize != freeSize) { uError("%s MemPool %s stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize); + ASSERT(0); } else { uDebug("%s MemPool %s stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize); @@ -849,6 +1029,8 @@ _return: } void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { + MP_API_ENTER(); + SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; char detailName[128]; @@ -860,7 +1042,7 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionPos"); detailName[sizeof(detailName) - 1] = 0; - mpPrintFileLineStat(&pSession->ctrl, pSession->stat.posStat, detailName); + mpPrintPosStat(&pSession->ctrl, &pSession->stat.posStat, detailName); } snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name); @@ -870,15 +1052,19 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); detailName[sizeof(detailName) - 1] = 0; - mpPrintNodeStat(&pSession->ctrl, pSession->stat.nodeStat, detailName); + mpPrintNodeStat(&pPool->ctrl, pPool->stat.nodeStat, detailName); snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos"); detailName[sizeof(detailName) - 1] = 0; - mpPrintFileLineStat(&pSession->ctrl, pSession->stat.posStat, detailName); + mpPrintPosStat(&pPool->ctrl, &pPool->stat.posStat, detailName); + + MP_API_LEAVE(); } int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { + MP_API_ENTER(); + int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = NULL; @@ -918,21 +1104,29 @@ _return: *poolHandle = pPool; + MP_API_LEAVE(); + return code; } void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) { + MP_API_ENTER(); + SMemPool* pPool = (SMemPool*)poolHandle; (void)mpUpdateCfg(pPool); + + MP_API_LEAVE(); } void taosMemPoolDestroySession(void* poolHandle, void* session) { + MP_API_ENTER(); + SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; if (NULL == pSession) { uWarn("null pointer of session"); - return; + goto _return; } (void)atomic_sub_fetch_32(&pSession->pJob->remainSession, 1); @@ -948,9 +1142,15 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) { TAOS_MEMSET(pSession, 0, sizeof(*pSession)); mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession); + +_return: + + MP_API_LEAVE(); } int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) { + MP_API_ENTER(); + int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = NULL; @@ -962,6 +1162,8 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) { if (gMPFps[gMPMgmt.strategy].initSessionFp) { MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession)); } + + MP_ERR_JRET(mpInitStat(&pSession->stat.posStat, true)); pSession->pJob = (SMPJob*)pJob; (void)atomic_add_fetch_32(&pSession->pJob->remainSession, 1); @@ -978,11 +1180,15 @@ _return: *ppSession = pSession; + MP_API_LEAVE(); + return code; } void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { + MP_API_ENTER(); + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { @@ -1001,10 +1207,14 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fil _return: + MP_API_LEAVE(); + return input.pMem; } void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { + MP_API_ENTER(); + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) { @@ -1025,10 +1235,14 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t _return: + MP_API_LEAVE(); + return input.pMem; } void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { + MP_API_ENTER(); + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { @@ -1064,10 +1278,14 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz _return: + MP_API_LEAVE(); + return input.pMem; } char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { + MP_API_ENTER(); + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) { @@ -1092,10 +1310,14 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* _return: + MP_API_LEAVE(); + return input.pMem; } char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) { + MP_API_ENTER(); + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr || size < 0) { @@ -1121,11 +1343,19 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64 _return: + MP_API_LEAVE(); + return input.pMem; } void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { + MP_API_ENTER(); + + if (NULL == ptr) { + goto _return; + } + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p", @@ -1144,11 +1374,15 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, _return: + MP_API_LEAVE(); + return; } int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { - int32_t code = TSDB_CODE_SUCCESS; + MP_API_ENTER(); + + int64_t code = 0; if (NULL == poolHandle || NULL == session || NULL == fileName) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p", __FUNCTION__, poolHandle, session, fileName); @@ -1156,7 +1390,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha } if (NULL == ptr) { - return 0; + goto _return; } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1165,10 +1399,14 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha _return: + MP_API_LEAVE(); + return code; } void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { + MP_API_ENTER(); + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) { @@ -1188,10 +1426,14 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment _return: + MP_API_LEAVE(); + return input.pMem; } void taosMemPoolClose(void* poolHandle) { + MP_API_ENTER(); + SMemPool* pPool = (SMemPool*)poolHandle; taosMemPoolPrintStat(poolHandle, NULL, "PoolClose"); @@ -1200,14 +1442,20 @@ void taosMemPoolClose(void* poolHandle) { taosMemoryFree(pPool->name); mpDestroyCacheGroup(&pPool->sessionCache); + + MP_API_LEAVE(); } void taosMemPoolModDestroy(void) { - + MP_API_ENTER(); + + MP_API_LEAVE(); } int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) { + MP_API_ENTER(); + int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { @@ -1229,27 +1477,38 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil _return: + MP_API_LEAVE(); + return code; } int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) { + MP_API_ENTER(); + + int32_t code = TSDB_CODE_SUCCESS; *ppJob = taosMemoryCalloc(1, sizeof(SMPJob)); if (NULL == *ppJob) { uError("calloc mp job failed, code: 0x%x", terrno); - return terrno; + MP_ERR_JRET(terrno); } SMPJob* pJob = (SMPJob*)*ppJob; pJob->job.jobId = jobId; + +_return: + + MP_API_LEAVE(); - return TSDB_CODE_SUCCESS; + return code; } void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) { + MP_API_ENTER(); + if (NULL == poolHandle) { *usedSize = 0; *needEnd = false; - return; + goto _return; } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1267,22 +1526,38 @@ void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* need *needEnd = true; *usedSize = atomic_load_64(&pPool->allocMemSize); #endif + +_return: + + MP_API_LEAVE(); } void taosMemPoolGetUsedSizeEnd(void* poolHandle) { + MP_API_ENTER(); + SMemPool* pPool = (SMemPool*)poolHandle; taosWUnLockLatch(&pPool->cfgLock); + + MP_API_LEAVE(); } bool taosMemPoolNeedRetireJob(void* poolHandle) { + MP_API_ENTER(); + SMemPool* pPool = (SMemPool*)poolHandle; + + MP_API_LEAVE(); + return atomic_load_64(&pPool->allocMemSize) >= atomic_load_64(&pPool->retireThreshold[0]); } int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) { + MP_API_ENTER(); + + int32_t code = TSDB_CODE_SUCCESS; if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) { uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize); - MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } SMPSession* pSession = (SMPSession*)session; @@ -1296,8 +1571,12 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t if (maxAllocSize) { *maxAllocSize = atomic_load_64(&pSession->maxAllocMemSize); } + +_return: + + MP_API_LEAVE(); - return TSDB_CODE_SUCCESS; + return code; } void taosAutoMemoryFree(void *ptr) {