enh: add stat info

This commit is contained in:
dapan1121 2024-07-02 19:25:33 +08:00
parent b3eb938527
commit 55ba9054ea
13 changed files with 397 additions and 101 deletions

View File

@ -165,6 +165,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds);
int32_t qExecutorInit(void);
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
/**

View File

@ -61,6 +61,8 @@ extern threadlocal void* threadPoolSession;
#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; } while (0)
#define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL)
#define taosSaveDisableMemoryPoolUsage(_handle) do { (_handle) = threadPoolHandle; threadPoolHandle = NULL; } while (0)
#define taosRestoreEnableMemoryPoolUsage(_handle) (threadPoolHandle = (_handle))
#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))

View File

@ -45,6 +45,7 @@ int32_t taosGetCpuInstructions(char* sse42, char* avx, char* avx2, char* fma, ch
int32_t taosGetTotalMemory(int64_t *totalKB);
int32_t taosGetProcMemory(int64_t *usedKB);
int32_t taosGetSysMemory(int64_t *usedKB);
int32_t taosGetSysAvailMemory(int64_t *availSize);
int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize);
void taosGetProcIODelta(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes);
void taosGetCardInfoDelta(int64_t *receive_bytes, int64_t *transmit_bytes);

View File

@ -1452,10 +1452,12 @@ int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockI
pColumn->pData = tmp;
// check if the allocated memory is aligned to the requried bytes.
#if 0
#if defined LINUX
if ((((uint64_t)pColumn->pData) & (MALLOC_ALIGN_BYTES - 1)) != 0x0) {
return TSDB_CODE_FAILED;
}
#endif
#endif
if (clearPayload) {

View File

@ -524,7 +524,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
return pTaskInfo->code;
}
void* msg = taosMemoryCalloc(1, msgSize);
void* msg = taosMemCalloc(1, msgSize);
if (NULL == msg) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemFree(pWrapper);
@ -535,7 +535,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
if (tSerializeSResFetchReq(msg, msgSize, &req) < 0) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemFree(pWrapper);
taosMemoryFree(msg);
taosMemFree(msg);
freeOperatorParam(req.pOpParam, OP_GET_PARAM);
return pTaskInfo->code;
}
@ -549,7 +549,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemoryFreeClear(msg);
taosMemFreeClear(msg);
taosMemFree(pWrapper);
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
@ -564,8 +564,11 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
pMsgSendInfo->fp = loadRemoteDataCallback;
int64_t transporterId = 0;
int32_t code =
asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
void* poolHandle = NULL;
taosSaveDisableMemoryPoolUsage(poolHandle);
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
taosRestoreEnableMemoryPoolUsage(poolHandle);
}
return TSDB_CODE_SUCCESS;

View File

@ -523,11 +523,15 @@ void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
((SExecTaskInfo*)tinfo)->paramSet = false;
}
int32_t qExecutorInit(void) {
taosThreadOnce(&initPoolOnce, initRefPool);
return TSDB_CODE_SUCCESS;
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
EOPTR_EXEC_MODEL model) {
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
taosThreadOnce(&initPoolOnce, initRefPool);
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);

View File

@ -1737,11 +1737,11 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
tstrncpy(pInfo->req.user, pInfo->pUser, tListLen(pInfo->req.user));
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = taosMemoryCalloc(1, contLen);
char* buf1 = taosMemCalloc(1, contLen);
tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
@ -1759,8 +1759,11 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
pMsgSendInfo->requestId = pTaskInfo->id.queryId;
int64_t transporterId = 0;
int32_t code =
asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
void* poolHandle = NULL;
taosSaveDisableMemoryPoolUsage(poolHandle);
int32_t code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
taosRestoreEnableMemoryPoolUsage(poolHandle);
tsem_wait(&pInfo->ready);
if (pTaskInfo->code) {

View File

@ -24,7 +24,7 @@ int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chu
void qwInitQueryPool(void) {
int64_t memSize = 0;
int32_t code = taosGetSysMemory(&memSize);
int32_t code = taosGetSysAvailMemory(&memSize);
if (TSDB_CODE_SUCCESS != code) {
return;
}

View File

@ -272,11 +272,14 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
void qwFreeTaskHandle(SQWTaskCtx *ctx, qTaskInfo_t *taskHandle) {
// Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
qDestroyTask(otaskHandle);
taosDisableMemoryPoolUsage();
qDebug("task handle destroyed");
}
}
@ -305,7 +308,7 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
// NO need to release dataConnInfo
qwFreeTaskHandle(&ctx->taskHandle);
qwFreeTaskHandle(ctx, &ctx->taskHandle);
if (ctx->sinkHandle) {
QW_SINK_ENABLE_MEMPOOL(ctx);

View File

@ -1368,6 +1368,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
QW_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
QW_ERR_JRET(qExecutorInit());
*qWorkerMgmt = mgmt;
qDebug("qworker initialized, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);

View File

@ -697,6 +697,59 @@ int32_t taosGetProcMemory(int64_t *usedKB) {
#endif
}
int32_t taosGetSysAvailMemory(int64_t *availSize) {
#ifdef WINDOWS
MEMORYSTATUSEX memsStat;
memsStat.dwLength = sizeof(memsStat);
if (!GlobalMemoryStatusEx(&memsStat)) {
return -1;
}
int64_t nMemFree = memsStat.ullAvailPhys;
int64_t nMemTotal = memsStat.ullTotalPhys;
*availSize = nMemTotal - nMemFree;
return 0;
#elif defined(_TD_DARWIN_64)
*availSize = 0;
return 0;
#else
TdFilePtr pFile = taosOpenFile("/proc/meminfo", TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
return -1;
}
ssize_t bytes = 0;
char line[1024] = {0};
int32_t expectedSize = 13; //"MemAvailable:"
while (!taosEOFFile(pFile)) {
bytes = taosGetsFile(pFile, sizeof(line), line);
if (bytes < 0) {
break;
}
if (line[0] != 'M' && line[3] != 'A') {
continue;
line[0] = 0;
}
if (0 == strncmp(line, "MemAvailable:", expectedSize)) {
break;
}
}
if (0 == line[0]) {
return -1;
}
char tmp[32];
sscanf(line, "%s %" PRId64, tmp, availSize);
*availSize *= 1024;
taosCloseFile(&pFile);
return 0;
#endif
}
int32_t taosGetSysMemory(int64_t *usedKB) {
#ifdef WINDOWS
MEMORYSTATUSEX memsStat;

View File

@ -35,13 +35,48 @@ extern "C" {
#define MP_CHUNK_FLAG_IN_USE (1 << 0)
#define MP_CHUNK_FLAG_NS_CHUNK (1 << 1)
#define MP_DBG_FLAG_LOG_MALLOC_FREE (1 << 0)
// STAT FLAGS
#define MP_STAT_FLAG_LOG_ALL_MEM_STAT (1 << 0)
#define MP_STAT_FLAG_LOG_ALL_CHUNK_STAT (1 << 1)
#define MP_STAT_FLAG_LOG_ALL_FILE_STAT (1 << 2)
#define MP_STAT_FLAG_LOG_ALL_LINE_STAT (1 << 3)
#define MP_STAT_FLAG_LOG_ALL_SESSION_STAT (1 << 4)
#define MP_STAT_FLAG_LOG_ALL_NODE_STAT (1 << 5)
#define MP_STAT_FLAG_LOG_ALL_POOL_STAT (1 << 6)
#define MP_STAT_FLAG_LOG_SOME_FILE_STAT (1 << 7)
#define MP_STAT_FLAG_LOG_SOME_LINE_STAT (1 << 8)
#define MP_STAT_FLAG_LOG_SOME_SESSION_STAT (1 << 9)
#define MP_STAT_FLAG_LOG_SOME_NODE_STAT (1 << 10)
#define MP_STAT_FLAG_LOG_SOME_POOL_STAT (1 << 11)
// STAT PROCESURE FLAGS
#define MP_STAT_PROC_FLAG_EXEC (1 << 0)
#define MP_STAT_PROC_FLAG_INPUT_ERR (1 << 1)
#define MP_STAT_PROC_FLAG_RES_SUCC (1 << 2)
#define MP_STAT_PROC_FLAG_RES_FAIL (1 << 3)
typedef enum EMPStatLogItem {
E_MP_STAT_LOG_MEM_MALLOC = 1,
E_MP_STAT_LOG_MEM_CALLOC,
E_MP_STAT_LOG_MEM_REALLOC,
E_MP_STAT_LOG_MEM_FREE,
E_MP_STAT_LOG_MEM_STRDUP,
E_MP_STAT_LOG_CHUNK_MALLOC,
E_MP_STAT_LOG_CHUNK_RECYCLE,
E_MP_STAT_LOG_CHUNK_REUSE,
E_MP_STAT_LOG_CHUNK_FREE,
} EMPStatLogItem;
// MEM HEADER FLAGS
#define MP_MEM_HEADER_FLAG_NS_CHUNK (1 << 0)
typedef struct SMPMemHeader {
uint64_t flags:3;
uint64_t size:5;
uint64_t flags:24;
uint64_t size:40;
} SMPMemHeader;
typedef struct SMPMemTailer {
@ -79,23 +114,55 @@ typedef struct SMPCacheGroup {
void* pNext;
} SMPCacheGroup;
typedef struct SMPStatInput {
char* file;
int64_t size;
int64_t origSize;
int32_t procFlags;
int32_t line;
} SMPStatInput;
typedef struct SMPStatItem {
int64_t exec;
int64_t inErr;
int64_t succ;
int64_t fail;
int64_t orig;
} SMPStatItem;
typedef struct SMPMemoryStat {
int64_t chunkAlloc;
int64_t chunkFree;
int64_t memMalloc;
int64_t memCalloc;
int64_t memRealloc;
int64_t strdup;
int64_t memFree;
SMPStatItem chunkMalloc;
SMPStatItem chunkRecycle;
SMPStatItem chunkReUse;
SMPStatItem chunkFree;
SMPStatItem memMalloc;
SMPStatItem memCalloc;
SMPStatItem memRealloc;
SMPStatItem strdup;
SMPStatItem memFree;
} SMPMemoryStat;
typedef struct SMPStat {
typedef struct SMPStatDetail {
SMPMemoryStat times;
SMPMemoryStat bytes;
} SMPStat;
} SMPStatDetail;
typedef struct SMPCtrlInfo {
int64_t statFlags;
int64_t funcFlags;
} SMPCtrlInfo;
typedef struct SMPStatInfo {
SMPStatDetail statDetail;
SHashObj* nodeStat;
SHashObj* fileStat;
SHashObj* lineStat;
} SMPStatInfo;
typedef struct SMPSession {
SMPListNode list;
SMPCtrlInfo ctrlInfo;
int64_t allocChunkNum;
int64_t allocChunkMemSize;
int64_t allocMemSize;
@ -118,7 +185,7 @@ typedef struct SMPSession {
SMPNSChunk *reUseNSChunkHead;
SMPNSChunk *reUseNSChunkTail;
SMPStat stat;
SMPStatInfo stat;
} SMPSession;
typedef struct SMPCacheGroupInfo {
@ -130,16 +197,14 @@ typedef struct SMPCacheGroupInfo {
void *pIdleList;
} SMPCacheGroupInfo;
typedef struct SMPDebugInfo {
int64_t flags;
} SMPDebugInfo;
typedef struct SMemPool {
char *name;
int16_t slotId;
SMemPoolCfg cfg;
int32_t maxChunkNum;
SMPDebugInfo dbgInfo;
SMPCtrlInfo ctrlInfo;
int16_t maxDiscardSize;
double threadChunkReserveNum;
@ -165,7 +230,7 @@ typedef struct SMemPool {
SMPChunk *readyNSChunkHead;
SMPChunk *readyNSChunkTail;
SMPStat stat;
SMPStatInfo stat;
} SMemPool;
#define MP_GET_FLAG(st, f) ((st) & (f))
@ -191,7 +256,8 @@ enum {
_chunkHead = _chunk; \
_chunkTail = _chunk; \
} else { \
(_chunkTail)->list.pNext = _chunk; \
(_chunkTail)->list.pNext = _chunk; \
(_chunkTail) = _chunk; \
} \
(_chunkNum)++; \
} while (0)

View File

@ -131,6 +131,8 @@ int32_t memPoolNewChunk(SMemPool* pPool, SMPChunk** ppChunk) {
pPool->allocChunkNum++;
pPool->allocChunkSize += pPool->cfg.chunkSize;
*ppChunk = pChunk;
return TSDB_CODE_SUCCESS;
}
@ -160,12 +162,8 @@ int32_t memPoolPrepareChunks(SMemPool* pPool, int32_t num) {
for (int32_t i = 0; i < num; ++i) {
MP_ERR_RET(memPoolNewChunk(pPool, &pChunk));
if (NULL == pPool->readyChunkTail) {
pPool->readyChunkHead = pChunk;
pPool->readyChunkTail = pChunk;
} else {
pPool->readyChunkTail->list.pNext = pChunk;
}
pPool->readyChunkTail->list.pNext = pChunk;
pPool->readyChunkTail = pChunk;
atomic_add_fetch_32(&pPool->readyChunkNum, 1);
}
@ -192,7 +190,7 @@ int32_t memPoolEnsureChunks(SMemPool* pPool) {
int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET(memPoolCheckCfg(cfg));
memcpy(&pPool->cfg, &cfg, sizeof(cfg));
memcpy(&pPool->cfg, cfg, sizeof(*cfg));
pPool->name = taosStrdup(poolName);
if (NULL == pPool->name) {
@ -249,6 +247,8 @@ int32_t memPoolGetChunk(SMemPool* pPool, SMPChunk** ppChunk) {
*ppChunk = pChunk;
return TSDB_CODE_SUCCESS;
} else {
atomic_add_fetch_32(&pPool->readyChunkNum, 1);
}
MP_RET(memPoolNewChunk(pPool, ppChunk));
@ -273,7 +273,7 @@ int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_
return TSDB_CODE_SUCCESS;
}
void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) {
void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) {
int32_t code = TSDB_CODE_SUCCESS;
SMPChunk* pChunk = NULL, *preSrcChunk = NULL;
void* pRes = NULL;
@ -317,11 +317,11 @@ _return:
return pRes;
}
void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size) {
void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) {
int32_t code = TSDB_CODE_SUCCESS;
SMPNSChunk* pChunk = NULL;
void* pRes = NULL;
int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer);
int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer) + alignment;
MP_ERR_JRET(memPoolNewNSChunk(pPool, &pChunk, totalSize));
SMPMemHeader* pHeader = (SMPMemHeader*)pChunk->pMemStart;
@ -345,55 +345,205 @@ _return:
return pRes;
}
void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, char* fileName, int32_t lineNo) {
void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment) : memPoolAllocFromChunk(pPool, pSession, size, alignment);
_return:
return res;
}
void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t totalSize = num * size;
void *res = memPoolMallocImpl(pPool, pSession, totalSize, fileName, lineNo);
void *res = memPoolMallocImpl(pPool, pSession, totalSize, 0);
if (NULL != res) {
memset(res, 0, totalSize);
}
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
_return:
return res;
}
void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t size, int64_t* origSize) {
void *res = NULL;
void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) {
//TODO
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
if (NULL == ptr) {
*origSize = 0;
res = memPoolMallocImpl(pPool, pSession, size, 0);
return res;
}
if (0 == size) {
memPoolFreeImpl(pPool, pSession, ptr, origSize);
return res;
}
*origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr);
if (*origSize >= size) {
SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
pHeader->size = size;
return ptr;
}
res = memPoolMallocImpl(pPool, pSession, size, 0);
SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader));
memcpy(res, ptr, *origSize);
memPoolFreeImpl(pPool, pSession, ptr, NULL);
return res;
}
int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, char* fileName, int32_t lineNo) {
void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) {
if (NULL == ptr) {
if (origSize) {
*origSize = 0;
}
return;
}
if (origSize) {
*origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr);
}
return;
}
int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) {
SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1;
return pHeader->size;
}
void memPoolLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) {
switch (item) {
case E_MP_STAT_LOG_MEM_MALLOC: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memMalloc.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.exec, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memMalloc.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.succ, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memMalloc.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memMalloc.fail, size);
}
break;
}
case E_MP_STAT_LOG_MEM_CALLOC:{
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memCalloc.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.exec, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memCalloc.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.succ, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memCalloc.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memCalloc.fail, size);
}
break;
}
case E_MP_STAT_LOG_MEM_REALLOC:{
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memRealloc.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.exec, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memRealloc.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.succ, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memRealloc.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memRealloc.fail, size);
}
break;
}
case E_MP_STAT_LOG_MEM_FREE:{
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.memFree.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memFree.exec, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.memFree.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memFree.succ, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.memFree.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memFree.fail, size);
}
break;
}
case E_MP_STAT_LOG_MEM_STRDUP: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.strdup.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.exec, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.strdup.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.succ, size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.strdup.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.fail, size);
}
break;
}
case E_MP_STAT_LOG_CHUNK_MALLOC:
case E_MP_STAT_LOG_CHUNK_RECYCLE:
case E_MP_STAT_LOG_CHUNK_REUSE:
case E_MP_STAT_LOG_CHUNK_FREE: {
}
default:
uError("Invalid stat item: %d", item);
break;
}
}
void memPoolLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPStatInput* pInput) {
switch (item) {
case E_MP_STAT_LOG_MEM_MALLOC:
case E_MP_STAT_LOG_MEM_CALLOC:
case E_MP_STAT_LOG_MEM_REALLOC:
case E_MP_STAT_LOG_MEM_FREE:
case E_MP_STAT_LOG_MEM_STRDUP: {
if (MP_GET_FLAG(pSession->ctrlInfo.statFlags, MP_STAT_FLAG_LOG_ALL_MEM_STAT)) {
memPoolLogStatDetail(&pSession->stat.statDetail, item, pInput);
}
if (MP_GET_FLAG(pPool->ctrlInfo.statFlags, MP_STAT_FLAG_LOG_ALL_MEM_STAT)) {
memPoolLogStatDetail(&pPool->stat.statDetail, item, pInput);
}
break;
}
case E_MP_STAT_LOG_CHUNK_MALLOC:
case E_MP_STAT_LOG_CHUNK_RECYCLE:
case E_MP_STAT_LOG_CHUNK_REUSE:
case E_MP_STAT_LOG_CHUNK_FREE: {
}
default:
uError("Invalid stat item: %d", item);
break;
}
}
void taosMemPoolModInit(void) {
taosThreadMutexInit(&gMPoolMutex, NULL);
@ -447,16 +597,17 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession) {
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = NULL;
SMPChunk* pChunk = NULL;
MP_ERR_JRET(memPoolGetIdleNode(pPool, &pPool->sessionCache, (void**)&pSession));
MP_ERR_JRET(memPoolGetChunk(pPool, &pSession->srcChunkHead));
MP_ERR_JRET(memPoolGetChunk(pPool, &pChunk));
pSession->allocChunkNum = 1;
pSession->allocChunkMemSize = pPool->cfg.chunkSize;
MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pSession->srcChunkHead);
MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pSession->srcChunkHead);
MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk);
MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk);
_return:
@ -473,6 +624,7 @@ _return:
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size);
@ -481,11 +633,16 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* f
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
return memPoolMallocImpl(pPool, pSession, size, fileName, lineNo);
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
res = memPoolMallocImpl(pPool, pSession, size, 0);
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
_return:
return NULL;
return res;
}
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
@ -500,15 +657,17 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
res = memPoolMallocImpl(pPool, pSession, num * size, fileName, lineNo);
int64_t totalSize = num * size;
SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
res = memPoolMallocImpl(pPool, pSession, totalSize, 0);
if (NULL != res) {
memset(res, 0, num * size);
memset(res, 0, totalSize);
}
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input);
_return:
@ -527,28 +686,12 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t s
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
if (NULL == ptr) {
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
} else if (0 == size) {
memPoolFreeImpl(pPool, pSession, ptr, fileName, lineNo);
} else {
int64_t origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo);
if (origSize >= size) {
SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
pHeader->size = size;
} else {
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader));
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
memcpy(res, ptr, origSize);
memset((char*)res + origSize, 0, size - origSize);
}
}
res = memPoolReallocImpl(pPool, pSession, ptr, size, &input.origSize);
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
MP_SET_FLAG(input.procFlags, ((res || 0 == size) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
_return:
@ -568,14 +711,15 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
int64_t size = strlen(ptr) + 1;
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size) : memPoolAllocFromChunk(pPool, pSession, size);
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
res = memPoolMallocImpl(pPool, pSession, size, 0);
if (NULL != res) {
strcpy(res, ptr);
}
if (MP_GET_FLAG(pPool->dbgInfo.flags, MP_DBG_FLAG_LOG_MALLOC_FREE)) {
//TODO
}
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_STRDUP, &input);
_return:
@ -584,15 +728,20 @@ _return:
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p",
__FUNCTION__, poolHandle, session, fileName, ptr);
if (NULL == poolHandle || NULL == session || NULL == fileName) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p",
__FUNCTION__, poolHandle, session, fileName);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
memPoolFreeImpl(pPool, pSession, ptr, fileName, lineNo);
SMPStatInput input = {.file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
memPoolFreeImpl(pPool, pSession, ptr, &input.size);
MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
_return:
@ -613,7 +762,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
return memPoolGetMemorySizeImpl(pPool, pSession, ptr, fileName, lineNo);
return memPoolGetMemorySizeImpl(pPool, pSession, ptr);
_return:
@ -622,6 +771,7 @@ _return:
void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64,
@ -631,11 +781,16 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
return memPoolMallocImpl(pPool, pSession, size, fileName, lineNo);
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
res = memPoolMallocImpl(pPool, pSession, size, alignment);
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
_return:
return NULL;
return res;
}
void taosMemPoolClose(void* poolHandle) {