diff --git a/include/common/tglobal.h b/include/common/tglobal.h index af5d38198a..a88fb6b8cc 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -74,7 +74,7 @@ extern int32_t tsQueryMaxConcurrentTaskNum; extern int32_t tsQueryConcurrentTaskNum; extern int32_t tsSingleQueryMaxMemorySize; extern int8_t tsQueryUseMemoryPool; -extern int32 tsQueryBufferPoolSize; +extern int32_t tsQueryBufferPoolSize; extern int32_t tsNumOfQueryThreads; extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index b300502b5c..a126f01481 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -44,7 +44,7 @@ typedef void (*mpIncConcSessionNum)(void); typedef void (*mpSetConcSessionNum)(int32_t); typedef void (*mpRetireCollections)(int64_t, bool); typedef void (*mpRetireCollection)(uint64_t); -typedef void (*mpCfgUpdate)(void*, SMemPoolCfg*); +typedef void (*mpCfgUpdate)(void*, void*); typedef struct SMemPoolCallBack { mpDecConcSessionNum decSessFp; @@ -55,6 +55,7 @@ typedef struct SMemPoolCallBack { mpCfgUpdate cfgUpdateFp; } SMemPoolCallBack; + typedef struct SMemPoolCfg { bool autoMaxSize; int64_t maxSize; @@ -67,6 +68,14 @@ typedef struct SMemPoolCfg { SMemPoolCallBack cb; } SMemPoolCfg; +typedef struct SMemPoolCollection { + uint64_t collectionId; + int64_t allocMemSize; + int64_t maxAllocMemSize; +} SMemPoolCollection; + + + void taosMemPoolModInit(void); int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle); void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo); @@ -83,6 +92,7 @@ void taosAutoMemoryFree(void *ptr); int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollection); void taosMemPoolDestroySession(void* poolHandle, void* session); int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection); +void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg); #define taosMemPoolFreeClear(ptr) \ do { \ diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 4a81b11a18..281088060f 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -52,7 +52,7 @@ void *taosMemMallocAlign(uint32_t alignment, int64_t size); #define TAOS_MEMCPY(_d, _s, _n) ((void)memcpy(_d, _s, _n)) #define TAOS_MEMMOVE(_d, _s, _n) ((void)memmove(_d, _s, _n)) -#define taosMemoryFreeClear(ptr) \ +#define taosMemFreeClear(ptr) \ do { \ if (ptr) { \ taosMemFree((void *)ptr); \ diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 50c070b6c1..c63b31ddbe 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1947,7 +1947,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p sendInfo->requestId = req.reqId; sendInfo->requestObjRefId = 0; sendInfo->param = pParam; - sendInfo->paramFreeFp = taosMemoryFree; + sendInfo->paramFreeFp = taosMemFree; sendInfo->fp = tmqPollCb; sendInfo->msgType = TDMT_VND_TMQ_CONSUME; @@ -2885,7 +2885,7 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; sendInfo->param = pParam; - sendInfo->paramFreeFp = taosMemoryFree; + sendInfo->paramFreeFp = taosMemFree; sendInfo->fp = askEpCb; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 84d1580838..f4f3dafcf0 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -55,7 +55,7 @@ int32_t tsShellActivityTimer = 3; // second // memory pool int8_t tsQueryUseMemoryPool = 1; -int32 tsQueryBufferPoolSize = 0; //MB +int32_t tsQueryBufferPoolSize = 0; //MB int32_t tsSingleQueryMaxMemorySize = 0; //MB // queue & threads diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 06d46622b3..b66c8fa09a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9504,7 +9504,7 @@ static void tDeleteMqDataRspCommon(void *rsp) { SMqDataRspCommon *pRsp = rsp; taosArrayDestroy(pRsp->blockDataLen); pRsp->blockDataLen = NULL; - taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); + taosArrayDestroyP(pRsp->blockData, NULL); pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); pRsp->blockSchema = NULL; @@ -9558,7 +9558,7 @@ void tDeleteSTaosxRsp(void *rsp) { STaosxRsp *pRsp = (STaosxRsp *)rsp; taosArrayDestroy(pRsp->createTableLen); pRsp->createTableLen = NULL; - taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); + taosArrayDestroyP(pRsp->createTableReq, NULL); pRsp->createTableReq = NULL; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 2095f80f0f..945d29f7ab 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -213,7 +213,7 @@ int32_t ipWhiteMgtUpdate(SMnode *pMnode, char *user, SIpWhiteList *pNew) { _OVER: (void)taosThreadRwlockUnlock(&ipWhiteMgt.rw); - taosArrayDestroyP(fqdns, (FDelete)taosMemoryFree); + taosArrayDestroyP(fqdns, NULL); if (code < 0) { mError("failed to update ip white list for user: %s at line %d since %s", user, lino, tstrerror(code)); } @@ -633,8 +633,8 @@ int32_t mndFetchAllIpWhite(SMnode *pMnode, SHashObj **ppIpWhiteTab) { } _OVER: - taosArrayDestroyP(fqdns, taosMemoryFree); - taosArrayDestroyP(pUserNames, taosMemoryFree); + taosArrayDestroyP(fqdns, NULL); + taosArrayDestroyP(pUserNames, NULL); if (code < 0) { mError("failed to fetch all ip white list at line %d since %s", lino, tstrerror(code)); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 592e61e51c..1a6056b344 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -233,9 +233,9 @@ typedef struct SQWorkerMgmt { } SQWorkerMgmt; typedef struct SQWQueryInfo { - bool retired; - void* pCollection; - SHashObj* pSessions; + int8_t retired; + SMemPoolCollection* pCollection; + SHashObj* pSessions; } SQWQueryInfo; typedef struct SQWRetireLowCtx { @@ -483,7 +483,7 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwInitQueryPool(void); void qwDestroyQueryInfo(SQWQueryInfo* pQuery); -int32_t qwInitSession(uint64_t qId, void** ppSession); +int32_t qwInitSession(QW_FPARAMS_DEF, void** ppSession); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 5d4d108921..b8c466dbca 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -60,7 +60,7 @@ int32_t qwInitQueryInfo(uint64_t qId, SQWQueryInfo* pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t code = taosMemPoolCallocCollection(qId, &pQuery->pCollection); + int32_t code = taosMemPoolCallocCollection(qId, (void**)&pQuery->pCollection); if (TSDB_CODE_SUCCESS != code) { taosHashCleanup(pQuery->pSessions); return code; @@ -118,17 +118,22 @@ void qwRetireCollectionCb(uint64_t collectionId) { } void qwLowLevelRetire(int64_t retireSize) { - void* pIter = taosHashIterate(gQueryMgmt.pQueryInfo, NULL); - while (pIter) { - vgInfo = pIter; + SQWQueryInfo* pQuery = (SQWQueryInfo*)taosHashIterate(gQueryMgmt.pQueryInfo, NULL); + while (pQuery) { + int64_t aSize = atomic_load_64(&pQuery->pCollection->allocMemSize); + if (aSize >= retireSize) { + atomic_store_8(&pQuery->retired, 1); + + //TODO RETIRE JOB/TASKS DIRECTLY - pInfo->vgHash[i].vgId = vgInfo->vgId; - pInfo->vgHash[i].hashBegin = vgInfo->hashBegin; - pInfo->vgHash[i].hashEnd = vgInfo->hashEnd; + qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, + pQuery->pCollection->collectionId, aSize, retireSize); + + taosHashCancelIterate(gQueryMgmt.pQueryInfo, pQuery); + break; + } - pIter = taosHashIterate(gQueryMgmt.pQueryInfo, pIter); - vgInfo = NULL; - ++i; + pQuery = (SQWQueryInfo*)taosHashIterate(gQueryMgmt.pQueryInfo, pQuery); } } @@ -170,7 +175,8 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { return code; } -void qwCheckUpateCfgCb(void* pHandle, SMemPoolCfg* pCfg) { +void qwCheckUpateCfgCb(void* pHandle, void* cfg) { + SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg; int64_t newCollectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; if (pCfg->collectionQuota != newCollectionQuota) { atomic_store_64(&pCfg->collectionQuota, newCollectionQuota); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 4a599a09e8..e48f894fb4 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -759,7 +759,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ctx->phase = -1; if (NULL != gQueryMgmt.memPoolHandle) { - QW_ERR_JRET(qwInitSession(QW_IDS(), &ctx->memPoolSession)); + QW_ERR_JRET(qwInitSession(QW_FPARAMS(), &ctx->memPoolSession)); } QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 30f503d11c..38091dbdd0 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -757,11 +757,11 @@ int32_t taosGetSysAvailMemory(int64_t *availSize) { #else TdFilePtr pFile = taosOpenFile("/proc/meminfo", TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { - return -1; + return terrno; } ssize_t bytes = 0; - char line[1024] = {0}; + char line[128] = {0}; int32_t expectedSize = 13; //"MemAvailable:" while (!taosEOFFile(pFile)) { bytes = taosGetsFile(pFile, sizeof(line), line); @@ -769,8 +769,8 @@ int32_t taosGetSysAvailMemory(int64_t *availSize) { break; } if (line[0] != 'M' && line[3] != 'A') { - continue; line[0] = 0; + continue; } if (0 == strncmp(line, "MemAvailable:", expectedSize)) { break; @@ -778,15 +778,15 @@ int32_t taosGetSysAvailMemory(int64_t *availSize) { } if (0 == line[0]) { - return -1; + return TSDB_CODE_SYSTEM_ERROR; } char tmp[32]; - sscanf(line, "%s %" PRId64, tmp, availSize); + (void)sscanf(line, "%s %" PRId64, tmp, availSize); *availSize *= 1024; - taosCloseFile(&pFile); + (void)taosCloseFile(&pFile); return 0; #endif } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 2561c21a08..ec00cc896e 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -194,10 +194,7 @@ typedef struct SMPStatInfo { typedef struct SMPCollection { - uint64_t collectionId; - int64_t allocMemSize; - int64_t maxAllocMemSize; - + SMemPoolCollection collection; // KEEP IT FIRST SMPStatInfo stat; } SMPCollection; @@ -292,8 +289,8 @@ typedef enum EMPMemStrategy { typedef struct SMPMsgQueue { SMemPool* pPool; - bool lowLevelRetire; - bool midLevelRetire; + int8_t lowLevelRetire; + int8_t midLevelRetire; } SMPMsgQueue; typedef struct SMemPoolMgmt { diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 81d5c6d2cc..a7d7b30959 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -312,15 +312,15 @@ void memPoolUpdateAllocMemSize(SMemPool* pPool, SMPSession* pSession, int64_t si int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size); memPoolUpdateMaxAllocMemSize(&pSession->maxAllocMemSize, allocMemSize); - allocMemSize = atomic_load_64(&pSession->pMpCollection->allocMemSize); - memPoolUpdateMaxAllocMemSize(&pSession->pMpCollection->maxAllocMemSize, allocMemSize); + allocMemSize = atomic_load_64(&pSession->pMpCollection->collection.allocMemSize); + memPoolUpdateMaxAllocMemSize(&pSession->pMpCollection->collection.maxAllocMemSize, allocMemSize); allocMemSize = atomic_load_64(&pPool->allocMemSize); memPoolUpdateMaxAllocMemSize(&pPool->maxAllocMemSize, allocMemSize); } -void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { +int32_t memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SMPChunk* pChunk = NULL, *preSrcChunk = NULL; void* pRes = NULL; @@ -361,10 +361,12 @@ void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, _return: - return pRes; + *ppRes = pRes; + + return code; } -void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) { +int32_t memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SMPNSChunk* pChunk = NULL; void* pRes = NULL; @@ -389,7 +391,9 @@ void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t siz _return: - return pRes; + *ppRes = pRes; + + return code; } int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { @@ -413,34 +417,34 @@ int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { SMPCollection* pCollection = pSession->pMpCollection; - int64_t cAllocSize = atomic_add_fetch_64(&pCollection->allocMemSize, size); + int64_t cAllocSize = atomic_add_fetch_64(&pCollection->collection.allocMemSize, size); int64_t quota = atomic_load_64(&pPool->cfg.collectionQuota); if (quota > 0 && cAllocSize > quota) { - uWarn("collection %" PRIx64 " allocSize " PRId64 " is over than quota %" PRId64, pCollection->collectionId, cAllocSize, quota); - pPool->cfg.cb.retireFp(pCollection->collectionId); - atomic_sub_fetch_64(&pCollection->allocMemSize, size); + uWarn("collection %" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pCollection->collection.collectionId, cAllocSize, quota); + pPool->cfg.cb.retireFp(pCollection->collection.collectionId); + atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size); MP_RET(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); } int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size); quota = atomic_load_64(&pPool->memRetireThreshold[2]); if (pAllocSize >= quota) { - uWarn("pool allocSize " PRId64 " reaches the high quota %" PRId64, pAllocSize, quota); - pPool->cfg.cb.retireFp(pCollection->collectionId); - atomic_sub_fetch_64(&pCollection->allocMemSize, size); + uWarn("pool allocSize %" PRId64 " reaches the high quota %" PRId64, pAllocSize, quota); + pPool->cfg.cb.retireFp(pCollection->collection.collectionId); + atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } quota = atomic_load_64(&pPool->memRetireThreshold[1]); if (pAllocSize >= quota) { - uInfo("pool allocSize " PRId64 " reaches the middle quota %" PRId64, pAllocSize, quota); + uInfo("pool allocSize %" PRId64 " reaches the middle quota %" PRId64, pAllocSize, quota); if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit) / 2) { uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64 ", collectionSize: %" PRId64, atomic_load_64(&pSession->allocMemSize), cAllocSize); - pPool->cfg.cb.retireFp(pCollection->collectionId); - atomic_sub_fetch_64(&pCollection->allocMemSize, size); + pPool->cfg.cb.retireFp(pCollection->collection.collectionId); + atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, false)); @@ -452,12 +456,12 @@ int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t s quota = atomic_load_64(&pPool->memRetireThreshold[0]); if (pAllocSize >= quota) { - uInfo("pool allocSize " PRId64 " reaches the low quota %" PRId64, pAllocSize, quota); + uInfo("pool allocSize %" PRId64 " reaches the low quota %" PRId64, pAllocSize, quota); if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) { uWarn("session retired in low quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize)); - pPool->cfg.cb.retireFp(pCollection->collectionId); - atomic_sub_fetch_64(&pCollection->allocMemSize, size); + pPool->cfg.cb.retireFp(pCollection->collection.collectionId); + atomic_sub_fetch_64(&pCollection->collection.allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, true)); @@ -476,7 +480,7 @@ int32_t memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, if (NULL != res) { memPoolUpdateAllocMemSize(pPool, pSession, size); } else { - atomic_sub_fetch_64(&pSession->pMpCollection->allocMemSize, size); + atomic_sub_fetch_64(&pSession->pMpCollection->collection.allocMemSize, size); atomic_sub_fetch_64(&pPool->allocMemSize, size); uError("malloc %" PRId64 " alighment %d failed", size, alignment); @@ -518,10 +522,12 @@ int32_t memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, u return code; } -void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size) { +int32_t memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size, void** ppRes) { int32_t code = TSDB_CODE_SUCCESS; int64_t totalSize = num * size; - void *res = memPoolMallocImpl(pPool, pSession, totalSize, 0); + void *res = NULL; + + MP_ERR_RET(memPoolMallocImpl(pPool, pSession, totalSize, 0, &res)); if (NULL != res) { memset(res, 0, totalSize); @@ -529,7 +535,9 @@ void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int6 _return: - return res; + *ppRes = res; + + return code; } @@ -571,7 +579,7 @@ int32_t memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void **pPtr, i if (NULL == *pPtr) { *origSize = 0; - MP_RET(memPoolMallocImpl(pPool, pSession, size, 0)); + MP_RET(memPoolMallocImpl(pPool, pSession, size, 0, pPtr)); } if (0 == size) { @@ -851,9 +859,9 @@ void* memPoolMgmtThreadFunc(void* param) { continue; } - if (gMPMgmt.msgQueue.midLevelRetire) { + if (atomic_load_8(&gMPMgmt.msgQueue.midLevelRetire)) { (*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), false); - } else if (gMPMgmt.msgQueue.lowLevelRetire) { + } else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) { (*gMPMgmt.msgQueue.pPool->cfg.cb.retiresFp)(atomic_load_64(&gMPMgmt.msgQueue.pPool->memRetireUnit), true); } @@ -876,8 +884,7 @@ void taosMemPoolModInit(void) { gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0); if (TSDB_CODE_SUCCESS != gMPMgmt.code) { - gMPMgmt.code = TAOS_SYSTEM_ERROR(errno); - qError("failed to init sem2, error: %x", gMPMgmt.code); + uError("failed to init sem2, error: %x", gMPMgmt.code); return; } @@ -935,7 +942,7 @@ _return: void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) { SMemPool* pPool = (SMemPool*)poolHandle; - MP_ERR_RET(memPoolApplyCfgUpdate(pPool)); + (void)memPoolApplyCfgUpdate(pPool); } void taosMemPoolDestroySession(void* poolHandle, void* session) { @@ -1022,11 +1029,7 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t int64_t totalSize = num * size; SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - res = memPoolMallocImpl(pPool, pSession, totalSize, 0); - - if (NULL != res) { - memset(res, 0, totalSize); - } + MP_ERR_JRET(memPoolCallocImpl(pPool, pSession, num, size, &res)); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input); @@ -1075,7 +1078,7 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char int64_t size = strlen(ptr) + 1; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - res = memPoolMallocImpl(pPool, pSession, size, 0); + MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, 0, &res)); if (NULL != res) { strcpy(res, ptr); } @@ -1145,7 +1148,7 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment SMPSession* pSession = (SMPSession*)session; SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; - res = memPoolMallocImpl(pPool, pSession, size, alignment); + MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, alignment, &res)); MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); @@ -1183,7 +1186,7 @@ int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection) } SMPCollection* pCollection = (SMPCollection*)*ppCollection; - pCollection->collectionId = collectionId; + pCollection->collection.collectionId = collectionId; return TSDB_CODE_SUCCESS; }