From 55ba9054ea14fa414608ef1994349a22f772f16a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 2 Jul 2024 19:25:33 +0800 Subject: [PATCH] enh: add stat info --- include/libs/executor/executor.h | 2 + include/os/osMemPool.h | 2 + include/os/osSysinfo.h | 1 + source/common/src/tdatablock.c | 2 + source/libs/executor/src/exchangeoperator.c | 13 +- source/libs/executor/src/executor.c | 6 +- source/libs/executor/src/sysscanoperator.c | 11 +- source/libs/qworker/src/qwMem.c | 2 +- source/libs/qworker/src/qwUtil.c | 7 +- source/libs/qworker/src/qworker.c | 2 + source/os/src/osSysinfo.c | 53 ++++ source/util/inc/tmempoolInt.h | 104 +++++-- source/util/src/tmempool.c | 293 +++++++++++++++----- 13 files changed, 397 insertions(+), 101 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 78a977c86f..2e9cb117ea 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -165,6 +165,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); +int32_t qExecutorInit(void); + void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); /** diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 5507e99597..bd49ded309 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -61,6 +61,8 @@ extern threadlocal void* threadPoolSession; #define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; } while (0) #define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL) +#define taosSaveDisableMemoryPoolUsage(_handle) do { (_handle) = threadPoolHandle; threadPoolHandle = NULL; } while (0) +#define taosRestoreEnableMemoryPoolUsage(_handle) (threadPoolHandle = (_handle)) #define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size))) #define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 7a1df2b81c..0d7d0cc46a 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -45,6 +45,7 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, ch int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetProcMemory(int64_t *usedKB); int32_t taosGetSysMemory(int64_t *usedKB); +int32_t taosGetSysAvailMemory(int64_t *availSize); int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); void taosGetProcIODelta(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); void taosGetCardInfoDelta(int64_t *receive_bytes, int64_t *transmit_bytes); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 809721d606..0f0f90a6d8 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1452,10 +1452,12 @@ int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockI pColumn->pData = tmp; // check if the allocated memory is aligned to the requried bytes. +#if 0 #if defined LINUX if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) { return TSDB_CODE_FAILED; } +#endif #endif if (clearPayload) { diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index ddb083d6e4..f6197621f4 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -524,7 +524,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas return pTaskInfo->code; } - void* msg = taosMemoryCalloc(1, msgSize); + void* msg = taosMemCalloc(1, msgSize); if (NULL == msg) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemFree(pWrapper); @@ -535,7 +535,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemFree(pWrapper); - taosMemoryFree(msg); + taosMemFree(msg); freeOperatorParam(req.pOpParam, OP_GET_PARAM); return pTaskInfo->code; } @@ -549,7 +549,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { - taosMemoryFreeClear(msg); + taosMemFreeClear(msg); taosMemFree(pWrapper); qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; @@ -564,8 +564,11 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; - int32_t code = - asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); + void* poolHandle = NULL; + taosSaveDisableMemoryPoolUsage(poolHandle); + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); + taosRestoreEnableMemoryPoolUsage(poolHandle); + } return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 77a80d229e..78fecf3eec 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -523,11 +523,15 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) { ((SExecTaskInfo*)tinfo)->paramSet = false; } +int32_t qExecutorInit(void) { + taosThreadOnce(&initPoolOnce, initRefPool); + return TSDB_CODE_SUCCESS; +} + int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql, EOPTR_EXEC_MODEL model) { SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - taosThreadOnce(&initPoolOnce, initRefPool); qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 415c7d1f0e..1dcf751594 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1737,11 +1737,11 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca tstrncpy(pInfo->req.user, pInfo->pUser, tListLen(pInfo->req.user)); int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); - char* buf1 = taosMemoryCalloc(1, contLen); + char* buf1 = taosMemCalloc(1, contLen); tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; @@ -1759,8 +1759,11 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca pMsgSendInfo->requestId = pTaskInfo->id.queryId; int64_t transporterId = 0; - int32_t code = - asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo); + void* poolHandle = NULL; + taosSaveDisableMemoryPoolUsage(poolHandle); + int32_t code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo); + taosRestoreEnableMemoryPoolUsage(poolHandle); + tsem_wait(&pInfo->ready); if (pTaskInfo->code) { diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 316dfcfc8a..adb5bc363e 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -24,7 +24,7 @@ int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chu void qwInitQueryPool(void) { int64_t memSize = 0; - int32_t code = taosGetSysMemory(&memSize); + int32_t code = taosGetSysAvailMemory(&memSize); if (TSDB_CODE_SUCCESS != code) { return; } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 9a1b94a283..5d81a37156 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -272,11 +272,14 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } -void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { +void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) { // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); qDestroyTask(otaskHandle); + taosDisableMemoryPoolUsage(); + qDebug("task handle destroyed"); } } @@ -305,7 +308,7 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { // NO need to release dataConnInfo - qwFreeTaskHandle(&ctx->taskHandle); + qwFreeTaskHandle(ctx, &ctx->taskHandle); if (ctx->sinkHandle) { QW_SINK_ENABLE_MEMPOOL(ctx); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index e609df4287..dc9795023c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1368,6 +1368,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S QW_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + QW_ERR_JRET(qExecutorInit()); + *qWorkerMgmt = mgmt; qDebug("qworker initialized, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 50eb8413c0..0dd95ae392 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -697,6 +697,59 @@ int32_t taosGetProcMemory(int64_t *usedKB) { #endif } +int32_t taosGetSysAvailMemory(int64_t *availSize) { +#ifdef WINDOWS + MEMORYSTATUSEX memsStat; + memsStat.dwLength = sizeof(memsStat); + if (!GlobalMemoryStatusEx(&memsStat)) { + return -1; + } + + int64_t nMemFree = memsStat.ullAvailPhys; + int64_t nMemTotal = memsStat.ullTotalPhys; + + *availSize = nMemTotal - nMemFree; + return 0; +#elif defined(_TD_DARWIN_64) + *availSize = 0; + return 0; +#else + TdFilePtr pFile = taosOpenFile("/proc/meminfo", TD_FILE_READ | TD_FILE_STREAM); + if (pFile == NULL) { + return -1; + } + + ssize_t bytes = 0; + char line[1024] = {0}; + int32_t expectedSize = 13; //"MemAvailable:" + while (!taosEOFFile(pFile)) { + bytes = taosGetsFile(pFile, sizeof(line), line); + if (bytes < 0) { + break; + } + if (line[0] != 'M' && line[3] != 'A') { + continue; + line[0] = 0; + } + if (0 == strncmp(line, "MemAvailable:", expectedSize)) { + break; + } + } + + if (0 == line[0]) { + return -1; + } + + char tmp[32]; + sscanf(line, "%s %" PRId64, tmp, availSize); + + *availSize *= 1024; + + taosCloseFile(&pFile); + return 0; +#endif +} + int32_t taosGetSysMemory(int64_t *usedKB) { #ifdef WINDOWS MEMORYSTATUSEX memsStat; diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 09616d3e5b..9ce6e594e9 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -35,13 +35,48 @@ extern "C" { #define MP_CHUNK_FLAG_IN_USE (1 << 0) #define MP_CHUNK_FLAG_NS_CHUNK (1 << 1) -#define MP_DBG_FLAG_LOG_MALLOC_FREE (1 << 0) +// STAT FLAGS +#define MP_STAT_FLAG_LOG_ALL_MEM_STAT (1 << 0) +#define MP_STAT_FLAG_LOG_ALL_CHUNK_STAT (1 << 1) + +#define MP_STAT_FLAG_LOG_ALL_FILE_STAT (1 << 2) +#define MP_STAT_FLAG_LOG_ALL_LINE_STAT (1 << 3) +#define MP_STAT_FLAG_LOG_ALL_SESSION_STAT (1 << 4) +#define MP_STAT_FLAG_LOG_ALL_NODE_STAT (1 << 5) +#define MP_STAT_FLAG_LOG_ALL_POOL_STAT (1 << 6) + +#define MP_STAT_FLAG_LOG_SOME_FILE_STAT (1 << 7) +#define MP_STAT_FLAG_LOG_SOME_LINE_STAT (1 << 8) +#define MP_STAT_FLAG_LOG_SOME_SESSION_STAT (1 << 9) +#define MP_STAT_FLAG_LOG_SOME_NODE_STAT (1 << 10) +#define MP_STAT_FLAG_LOG_SOME_POOL_STAT (1 << 11) + +// STAT PROCESURE FLAGS +#define MP_STAT_PROC_FLAG_EXEC (1 << 0) +#define MP_STAT_PROC_FLAG_INPUT_ERR (1 << 1) +#define MP_STAT_PROC_FLAG_RES_SUCC (1 << 2) +#define MP_STAT_PROC_FLAG_RES_FAIL (1 << 3) + + +typedef enum EMPStatLogItem { + E_MP_STAT_LOG_MEM_MALLOC = 1, + E_MP_STAT_LOG_MEM_CALLOC, + E_MP_STAT_LOG_MEM_REALLOC, + E_MP_STAT_LOG_MEM_FREE, + E_MP_STAT_LOG_MEM_STRDUP, + E_MP_STAT_LOG_CHUNK_MALLOC, + E_MP_STAT_LOG_CHUNK_RECYCLE, + E_MP_STAT_LOG_CHUNK_REUSE, + E_MP_STAT_LOG_CHUNK_FREE, +} EMPStatLogItem; + +// MEM HEADER FLAGS #define MP_MEM_HEADER_FLAG_NS_CHUNK (1 << 0) typedef struct SMPMemHeader { - uint64_t flags:3; - uint64_t size:5; + uint64_t flags:24; + uint64_t size:40; } SMPMemHeader; typedef struct SMPMemTailer { @@ -79,23 +114,55 @@ typedef struct SMPCacheGroup { void* pNext; } SMPCacheGroup; +typedef struct SMPStatInput { + char* file; + int64_t size; + int64_t origSize; + int32_t procFlags; + int32_t line; +} SMPStatInput; + +typedef struct SMPStatItem { + int64_t exec; + int64_t inErr; + int64_t succ; + int64_t fail; + int64_t orig; +} SMPStatItem; + typedef struct SMPMemoryStat { - int64_t chunkAlloc; - int64_t chunkFree; - int64_t memMalloc; - int64_t memCalloc; - int64_t memRealloc; - int64_t strdup; - int64_t memFree; + SMPStatItem chunkMalloc; + SMPStatItem chunkRecycle; + SMPStatItem chunkReUse; + SMPStatItem chunkFree; + SMPStatItem memMalloc; + SMPStatItem memCalloc; + SMPStatItem memRealloc; + SMPStatItem strdup; + SMPStatItem memFree; } SMPMemoryStat; -typedef struct SMPStat { +typedef struct SMPStatDetail { SMPMemoryStat times; SMPMemoryStat bytes; -} SMPStat; +} SMPStatDetail; + +typedef struct SMPCtrlInfo { + int64_t statFlags; + int64_t funcFlags; +} SMPCtrlInfo; + +typedef struct SMPStatInfo { + SMPStatDetail statDetail; + SHashObj* nodeStat; + SHashObj* fileStat; + SHashObj* lineStat; +} SMPStatInfo; typedef struct SMPSession { SMPListNode list; + SMPCtrlInfo ctrlInfo; + int64_t allocChunkNum; int64_t allocChunkMemSize; int64_t allocMemSize; @@ -118,7 +185,7 @@ typedef struct SMPSession { SMPNSChunk *reUseNSChunkHead; SMPNSChunk *reUseNSChunkTail; - SMPStat stat; + SMPStatInfo stat; } SMPSession; typedef struct SMPCacheGroupInfo { @@ -130,16 +197,14 @@ typedef struct SMPCacheGroupInfo { void *pIdleList; } SMPCacheGroupInfo; -typedef struct SMPDebugInfo { - int64_t flags; -} SMPDebugInfo; + typedef struct SMemPool { char *name; int16_t slotId; SMemPoolCfg cfg; int32_t maxChunkNum; - SMPDebugInfo dbgInfo; + SMPCtrlInfo ctrlInfo; int16_t maxDiscardSize; double threadChunkReserveNum; @@ -165,7 +230,7 @@ typedef struct SMemPool { SMPChunk *readyNSChunkHead; SMPChunk *readyNSChunkTail; - SMPStat stat; + SMPStatInfo stat; } SMemPool; #define MP_GET_FLAG(st, f) ((st) & (f)) @@ -191,7 +256,8 @@ enum { _chunkHead = _chunk; \ _chunkTail = _chunk; \ } else { \ - (_chunkTail)->list.pNext = _chunk; \ + (_chunkTail)->list.pNext = _chunk; \ + (_chunkTail) = _chunk; \ } \ (_chunkNum)++; \ } while (0) diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 31b6810a83..3f84747992 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -131,6 +131,8 @@ int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) { pPool->allocChunkNum++; pPool->allocChunkSize += pPool->cfg.chunkSize; + *ppChunk = pChunk; + return TSDB_CODE_SUCCESS; } @@ -160,12 +162,8 @@ int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) { for (int32_t i = 0; i < num; ++i) { MP_ERR_RET(memPoolNewChunk(pPool, &pChunk)); - if (NULL == pPool->readyChunkTail) { - pPool->readyChunkHead = pChunk; - pPool->readyChunkTail = pChunk; - } else { - pPool->readyChunkTail->list.pNext = pChunk; - } + pPool->readyChunkTail->list.pNext = pChunk; + pPool->readyChunkTail = pChunk; atomic_add_fetch_32(&pPool->readyChunkNum, 1); } @@ -192,7 +190,7 @@ int32_t memPoolEnsureChunks(SMemPool* pPool) { int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET(memPoolCheckCfg(cfg)); - memcpy(&pPool->cfg, &cfg, sizeof(cfg)); + memcpy(&pPool->cfg, cfg, sizeof(*cfg)); pPool->name = taosStrdup(poolName); if (NULL == pPool->name) { @@ -249,6 +247,8 @@ int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) { *ppChunk = pChunk; return TSDB_CODE_SUCCESS; + } else { + atomic_add_fetch_32(&pPool->readyChunkNum, 1); } MP_RET(memPoolNewChunk(pPool, ppChunk)); @@ -273,7 +273,7 @@ int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_ return TSDB_CODE_SUCCESS; } -void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) { +void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { int32_t code = TSDB_CODE_SUCCESS; SMPChunk* pChunk = NULL, *preSrcChunk = NULL; void* pRes = NULL; @@ -317,11 +317,11 @@ _return: return pRes; } -void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) { +void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { int32_t code = TSDB_CODE_SUCCESS; SMPNSChunk* pChunk = NULL; void* pRes = NULL; - int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer); + int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer) + alignment; MP_ERR_JRET(memPoolNewNSChunk(pPool, &pChunk, totalSize)); SMPMemHeader* pHeader = (SMPMemHeader*)pChunk->pMemStart; @@ -345,55 +345,205 @@ _return: return pRes; } -void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, char* fileName, int32_t lineNo) { +void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { int32_t code = TSDB_CODE_SUCCESS; void *res = NULL; - res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); - - if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { - //TODO - } + res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment) : memPoolAllocFromChunk(pPool, pSession, size, alignment); _return: return res; } -void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, char* fileName, int32_t lineNo) { +void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size) { int32_t code = TSDB_CODE_SUCCESS; int64_t totalSize = num * size; - void *res = memPoolMallocImpl(pPool, pSession, totalSize, fileName, lineNo); + void *res = memPoolMallocImpl(pPool, pSession, totalSize, 0); if (NULL != res) { memset(res, 0, totalSize); } - if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { - //TODO - } - _return: return res; } +void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t size, int64_t* origSize) { + void *res = NULL; -void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) { - //TODO - - if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { - //TODO + if (NULL == ptr) { + *origSize = 0; + res = memPoolMallocImpl(pPool, pSession, size, 0); + return res; } + + if (0 == size) { + memPoolFreeImpl(pPool, pSession, ptr, origSize); + return res; + } + + *origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr); + + if (*origSize >= size) { + SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); + pHeader->size = size; + return ptr; + } + + res = memPoolMallocImpl(pPool, pSession, size, 0); + SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); + SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader)); + + memcpy(res, ptr, *origSize); + memPoolFreeImpl(pPool, pSession, ptr, NULL); + + return res; } -int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) { + +void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) { + if (NULL == ptr) { + if (origSize) { + *origSize = 0; + } + + return; + } + + if (origSize) { + *origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr); + } + + return; +} + +int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) { SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1; return pHeader->size; } +void memPoolLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) { + switch (item) { + case E_MP_STAT_LOG_MEM_MALLOC: { + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { + atomic_add_fetch_64(&pDetail->times.memMalloc.exec, 1); + atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { + atomic_add_fetch_64(&pDetail->times.memMalloc.succ, 1); + atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { + atomic_add_fetch_64(&pDetail->times.memMalloc.fail, 1); + atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, size); + } + break; + } + case E_MP_STAT_LOG_MEM_CALLOC:{ + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { + atomic_add_fetch_64(&pDetail->times.memCalloc.exec, 1); + atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { + atomic_add_fetch_64(&pDetail->times.memCalloc.succ, 1); + atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { + atomic_add_fetch_64(&pDetail->times.memCalloc.fail, 1); + atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, size); + } + break; + } + case E_MP_STAT_LOG_MEM_REALLOC:{ + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { + atomic_add_fetch_64(&pDetail->times.memRealloc.exec, 1); + atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { + atomic_add_fetch_64(&pDetail->times.memRealloc.succ, 1); + atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { + atomic_add_fetch_64(&pDetail->times.memRealloc.fail, 1); + atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, size); + } + break; + } + case E_MP_STAT_LOG_MEM_FREE:{ + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { + atomic_add_fetch_64(&pDetail->times.memFree.exec, 1); + atomic_add_fetch_64(&pDetail->bytes.memFree.exec, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { + atomic_add_fetch_64(&pDetail->times.memFree.succ, 1); + atomic_add_fetch_64(&pDetail->bytes.memFree.succ, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { + atomic_add_fetch_64(&pDetail->times.memFree.fail, 1); + atomic_add_fetch_64(&pDetail->bytes.memFree.fail, size); + } + break; + } + case E_MP_STAT_LOG_MEM_STRDUP: { + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { + atomic_add_fetch_64(&pDetail->times.strdup.exec, 1); + atomic_add_fetch_64(&pDetail->bytes.strdup.exec, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { + atomic_add_fetch_64(&pDetail->times.strdup.succ, 1); + atomic_add_fetch_64(&pDetail->bytes.strdup.succ, size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { + atomic_add_fetch_64(&pDetail->times.strdup.fail, 1); + atomic_add_fetch_64(&pDetail->bytes.strdup.fail, size); + } + break; + } + case E_MP_STAT_LOG_CHUNK_MALLOC: + case E_MP_STAT_LOG_CHUNK_RECYCLE: + case E_MP_STAT_LOG_CHUNK_REUSE: + case E_MP_STAT_LOG_CHUNK_FREE: { + + } + default: + uError("Invalid stat item: %d", item); + break; + } + + +} + +void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPStatInput* pInput) { + switch (item) { + case E_MP_STAT_LOG_MEM_MALLOC: + case E_MP_STAT_LOG_MEM_CALLOC: + case E_MP_STAT_LOG_MEM_REALLOC: + case E_MP_STAT_LOG_MEM_FREE: + case E_MP_STAT_LOG_MEM_STRDUP: { + if (MP_GET_FLAG(pSession->ctrlInfo.statFlags, MP_STAT_FLAG_LOG_ALL_MEM_STAT)) { + memPoolLogStatDetail(&pSession->stat.statDetail, item, pInput); + } + if (MP_GET_FLAG(pPool->ctrlInfo.statFlags, MP_STAT_FLAG_LOG_ALL_MEM_STAT)) { + memPoolLogStatDetail(&pPool->stat.statDetail, item, pInput); + } + break; + } + case E_MP_STAT_LOG_CHUNK_MALLOC: + case E_MP_STAT_LOG_CHUNK_RECYCLE: + case E_MP_STAT_LOG_CHUNK_REUSE: + case E_MP_STAT_LOG_CHUNK_FREE: { + + } + default: + uError("Invalid stat item: %d", item); + break; + } +} + void taosMemPoolModInit(void) { taosThreadMutexInit(&gMPoolMutex, NULL); @@ -447,16 +597,17 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) { int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = NULL; + SMPChunk* pChunk = NULL; MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, (void**)&pSession)); - MP_ERR_JRET(memPoolGetChunk(pPool, &pSession->srcChunkHead)); + MP_ERR_JRET(memPoolGetChunk(pPool, &pChunk)); pSession->allocChunkNum = 1; pSession->allocChunkMemSize = pPool->cfg.chunkSize; - MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pSession->srcChunkHead); - MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pSession->srcChunkHead); + MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk); + MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk); _return: @@ -473,6 +624,7 @@ _return: void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { int32_t code = TSDB_CODE_SUCCESS; + void *res = NULL; if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size); @@ -481,11 +633,16 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* f SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - return memPoolMallocImpl(pPool, pSession, size, fileName, lineNo); + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; + + res = memPoolMallocImpl(pPool, pSession, size, 0); + + MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); + memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); _return: - return NULL; + return res; } void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { @@ -500,15 +657,17 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - res = memPoolMallocImpl(pPool, pSession, num * size, fileName, lineNo); + int64_t totalSize = num * size; + SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; + + res = memPoolMallocImpl(pPool, pSession, totalSize, 0); if (NULL != res) { - memset(res, 0, num * size); + memset(res, 0, totalSize); } - if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { - //TODO - } + MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); + memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input); _return: @@ -527,28 +686,12 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t s SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - if (NULL == ptr) { - res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); - } else if (0 == size) { - memPoolFreeImpl(pPool, pSession, ptr, fileName, lineNo); - } else { - int64_t origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo); - if (origSize >= size) { - SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); - pHeader->size = size; - } else { - res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); - SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader)); - SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader)); + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - memcpy(res, ptr, origSize); - memset((char*)res + origSize, 0, size - origSize); - } - } + res = memPoolReallocImpl(pPool, pSession, ptr, size, &input.origSize); - if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { - //TODO - } + MP_SET_FLAG(input.procFlags, ((res || 0 == size) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); + memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input); _return: @@ -568,14 +711,15 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; int64_t size = strlen(ptr) + 1; - res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size); + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; + + res = memPoolMallocImpl(pPool, pSession, size, 0); if (NULL != res) { strcpy(res, ptr); } - if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) { - //TODO - } + MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); + memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_STRDUP, &input); _return: @@ -584,15 +728,20 @@ _return: void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { int32_t code = TSDB_CODE_SUCCESS; - if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) { - uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p", - __FUNCTION__, poolHandle, session, fileName, ptr); + if (NULL == poolHandle || NULL == session || NULL == fileName) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p", + __FUNCTION__, poolHandle, session, fileName); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - memPoolFreeImpl(pPool, pSession, ptr, fileName, lineNo); + SMPStatInput input = {.file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; + + memPoolFreeImpl(pPool, pSession, ptr, &input.size); + + MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); + memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input); _return: @@ -613,7 +762,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - return memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo); + return memPoolGetMemorySizeImpl(pPool, pSession, ptr); _return: @@ -622,6 +771,7 @@ _return: void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { int32_t code = TSDB_CODE_SUCCESS; + void *res = NULL; if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64, @@ -631,11 +781,16 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - return memPoolMallocImpl(pPool, pSession, size, fileName, lineNo); + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; + + res = memPoolMallocImpl(pPool, pSession, size, alignment); + + MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); + memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); _return: - return NULL; + return res; } void taosMemPoolClose(void* poolHandle) {