From c5c85efeb55072ab382a009e1de833190234ebca Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 6 Nov 2024 19:26:14 +0800 Subject: [PATCH] enh: adjust mode and retire policy --- include/common/tglobal.h | 4 +- include/os/osMemPool.h | 64 ++- include/util/tdef.h | 2 + source/common/src/tglobal.c | 15 +- source/dnode/mgmt/exe/dmMain.c | 7 + source/libs/executor/src/dataInserter.c | 2 +- source/libs/executor/src/exchangeoperator.c | 16 +- source/libs/executor/src/executil.c | 2 + source/libs/executor/src/executor.c | 1 + source/libs/executor/src/querytask.c | 10 +- source/libs/executor/src/sysscanoperator.c | 12 +- source/libs/qworker/inc/qwInt.h | 6 - source/libs/qworker/src/qwMem.c | 262 +--------- source/libs/qworker/src/qwUtil.c | 4 +- source/libs/qworker/src/qworker.c | 55 ++- source/util/inc/tmempoolInt.h | 13 +- source/util/src/mpChunk.c | 3 +- source/util/src/mpDirect.c | 12 +- source/util/src/tmempool.c | 519 ++++++++++---------- source/util/test/memPoolTest.cpp | 170 ++----- 20 files changed, 467 insertions(+), 712 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 436b4e3276..9b6d4eae5e 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -74,7 +74,9 @@ extern int32_t tsQueryMaxConcurrentTaskNum; extern int32_t tsQueryConcurrentTaskNum; extern int32_t tsSingleQueryMaxMemorySize; extern int8_t tsQueryUseMemoryPool; -extern int32_t tsQueryBufferPoolSize; +//extern int32_t tsQueryBufferPoolSize; +extern int32_t tsMinReservedMemorySize; +extern int64_t tsCurrentAvailMemorySize; extern int32_t tsNumOfQueryThreads; extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index b40a1e8db9..5c926c9b4a 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -58,8 +58,8 @@ typedef struct SMPMemoryStat { SMPStatItem memMalloc; SMPStatItem memCalloc; SMPStatItemExt memRealloc; - SMPStatItem strdup; - SMPStatItem strndup; + SMPStatItem memStrdup; + SMPStatItem memStrndup; SMPStatItem memFree; SMPStatItem memTrim; @@ -78,33 +78,33 @@ typedef struct SMPStatDetail { typedef void (*mpDecConcSessionNum)(void); typedef void (*mpIncConcSessionNum)(void); typedef void (*mpSetConcSessionNum)(int32_t); -typedef void (*mpRetireJobs)(void*, int64_t, bool, int32_t); -typedef void (*mpRetireJob)(SMemPoolJob*, int32_t); +typedef void (*mpReserveFailFp)(int64_t, int32_t); +typedef void (*mpReserveReachFp)(uint64_t, int32_t); typedef void (*mpCfgUpdate)(void*, void*); typedef struct SMemPoolCallBack { - mpDecConcSessionNum decSessFp; - mpIncConcSessionNum incSessFp; - mpSetConcSessionNum setSessFp; - mpRetireJobs retireJobsFp; - mpRetireJob retireJobFp; - mpCfgUpdate cfgUpdateFp; + //mpDecConcSessionNum decSessFp; + //mpIncConcSessionNum incSessFp; + //mpSetConcSessionNum setSessFp; + mpReserveFailFp failFp; + mpReserveReachFp reachFp; + //mpCfgUpdate cfgUpdateFp; } SMemPoolCallBack; typedef struct SMemPoolCfg { - bool autoMaxSize; - int64_t reserveSize; - int64_t retireUnitSize; - int64_t freeSize; - int64_t jobQuota; + //bool reserveMode; + int32_t *reserveSize; //MB + //int32_t *upperLimitSize; //MB + //int64_t retireUnitSize; + int32_t *jobQuota; //MB int32_t chunkSize; int32_t threadNum; MemPoolEvictPolicy evicPolicy; SMemPoolCallBack cb; } SMemPoolCfg; -#define MEMPOOL_GET_ALLOC_SIZE(_dstat) ((_dstat)->bytes.memMalloc.succ + (_dstat)->bytes.memCalloc.succ + (_dstat)->bytes.memRealloc.succ + (_dstat)->bytes.strdup.succ + (_dstat)->bytes.strndup.succ) +#define MEMPOOL_GET_ALLOC_SIZE(_dstat) ((_dstat)->bytes.memMalloc.succ + (_dstat)->bytes.memCalloc.succ + (_dstat)->bytes.memRealloc.succ + (_dstat)->bytes.memStrdup.succ + (_dstat)->bytes.memStrndup.succ) #define MEMPOOL_GET_FREE_SIZE(_dstat) ((_dstat)->bytes.memRealloc.origSucc + (_dstat)->bytes.memFree.succ) #define MEMPOOL_GET_USED_SIZE(_dstat) (MEMPOOL_GET_ALLOC_SIZE(_dstat) - MEMPOOL_GET_FREE_SIZE(_dstat)) @@ -129,8 +129,8 @@ void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg); void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName); void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd); void taosMemPoolGetUsedSizeEnd(void* poolHandle); -bool taosMemPoolNeedRetireJob(void* poolHandle); int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize); +int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp); #define taosMemPoolFreeClear(ptr) \ @@ -143,29 +143,27 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t #ifndef BUILD_TEST -extern threadlocal void* threadPoolHandle; +extern void* gMemPoolHandle; extern threadlocal void* threadPoolSession; -#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; tsEnableRandErr = true;} while (0) -#define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL, tsEnableRandErr = false) -#define taosSaveDisableMemoryPoolUsage(_handle) do { (_handle) = threadPoolHandle; threadPoolHandle = NULL; } while (0) -#define taosRestoreEnableMemoryPoolUsage(_handle) (threadPoolHandle = (_handle)) +#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0) +#define taosDisableMemoryPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0) -#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size))) -#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) -#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) -#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr))) -#define taosStrndup(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolStrndup(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) -#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) -#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) -#define taosMemoryTrim(_size, _trimed) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed))) -#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) +#define taosMemoryMalloc(_size) ((NULL != gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size))) +#define taosMemoryCalloc(_num, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) +#define taosMemoryRealloc(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) +#define taosStrdup(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr))) +#define taosStrndup(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) +#define taosMemoryFree(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) +#define taosMemorySize(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) +#define taosMemoryTrim(_size, _trimed) ((NULL != gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed))) +#define taosMemoryMallocAlign(_alignment, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) #else #define taosEnableMemoryPoolUsage(_pool, _session) #define taosDisableMemoryPoolUsage() -#define taosSaveDisableMemoryPoolUsage(_handle) -#define taosRestoreEnableMemoryPoolUsage(_handle) +#define taosSaveDisableMemoryPoolUsage() +#define taosRestoreEnableMemoryPoolUsage() #define taosMemoryMalloc(_size) taosMemMalloc(_size) #define taosMemoryCalloc(_num, _size) taosMemCalloc(_num, _size) diff --git a/include/util/tdef.h b/include/util/tdef.h index b4cb1bdd1c..486cb19200 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -616,6 +616,8 @@ typedef enum { ANAL_ALGO_TYPE_END, } EAnalAlgoType; +#define MIN_RESERVE_MEM_SIZE 1024 // MB + #ifdef __cplusplus } #endif diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 28b9b05368..aa10d1fd6f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -57,6 +57,8 @@ int32_t tsShellActivityTimer = 3; // second int8_t tsQueryUseMemoryPool = 1; int32_t tsQueryBufferPoolSize = 0; //MB int32_t tsSingleQueryMaxMemorySize = 0; //MB +int32_t tsMinReservedMemorySize = MIN_RESERVE_MEM_SIZE; //MB +int64_t tsCurrentAvailMemorySize = 0; // queue & threads int32_t tsQueryMinConcurrentTaskNum = 1; @@ -744,8 +746,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0); - TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); + //TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minReservedMemorySize", tsMinReservedMemorySize, 1024, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -1699,8 +1702,11 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "singleQueryMaxMemorySize"); tsSingleQueryMaxMemorySize = pItem->i32; - TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryBufferPoolSize"); - tsQueryBufferPoolSize = pItem->i32; + //TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryBufferPoolSize"); + //tsQueryBufferPoolSize = pItem->i32; + + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "minReservedMemorySize"); + tsMinReservedMemorySize = pItem->i32; // GRANT_CFG_GET; TAOS_RETURN(TSDB_CODE_SUCCESS); @@ -2070,7 +2076,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"supportVnodes", &tsNumOfSupportVnodes}, {"experimental", &tsExperimental}, {"maxTsmaNum", &tsMaxTsmaNum}, - {"queryBufferPoolSize", &tsQueryBufferPoolSize}, {"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize}}; if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index f94b9e2d73..3ea1ecd7b4 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -442,6 +442,13 @@ int mainWindows(int argc, char **argv) { taosCleanupArgs(); return code; } + + if ((code = taosMemoryPoolInit(qWorkerRetireJobs, qWorkerRetireJob)) != 0) { + dError("failed to init conv"); + taosCloseLog(); + taosCleanupArgs(); + return code; + } if ((code = taosConvInit()) != 0) { dError("failed to init conv"); diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 6bbd74e8df..a6939b2132 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -432,7 +432,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; (void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize); taosArrayDestroy(pInserter->pDataBlocks); - taosMemoryFree(pInserter->pSchema); + taosMemFree(pInserter->pSchema); taosMemoryFree(pInserter->pParam); taosHashCleanup(pInserter->pCols); (void)taosThreadMutexDestroy(&pInserter->mutex); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index e74be4393d..20502ffa39 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -116,7 +116,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pDataInfo->pRsp); + taosMemFreeClear(pDataInfo->pRsp); goto _error; } } else { @@ -125,7 +125,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); - taosMemoryFreeClear(pDataInfo->pRsp); + taosMemFreeClear(pDataInfo->pRsp); } break; } @@ -154,13 +154,13 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } - taosMemoryFreeClear(pDataInfo->pRsp); + taosMemFreeClear(pDataInfo->pRsp); if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) { pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pDataInfo->pRsp); + taosMemFreeClear(pDataInfo->pRsp); goto _error; } } @@ -633,7 +633,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas pDataInfo->startTime = taosGetTimestampUs(); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - SFetchRspHandleWrapper* pWrapper = taosMemCalloc(1, sizeof(SFetchRspHandleWrapper)); + SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper)); QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno); pWrapper->exchangeId = pExchangeInfo->self; pWrapper->sourceIndex = sourceIndex; @@ -673,7 +673,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas return pTaskInfo->code; } - void* msg = taosMemCalloc(1, msgSize); + void* msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemFree(pWrapper); @@ -696,7 +696,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas pSource->execId, pExchangeInfo, sourceIndex, totalSources); // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo)); + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { taosMemFreeClear(msg); taosMemFree(pWrapper); @@ -714,9 +714,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas int64_t transporterId = 0; void* poolHandle = NULL; - taosSaveDisableMemoryPoolUsage(poolHandle); code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); - taosRestoreEnableMemoryPoolUsage(poolHandle); QUERY_CHECK_CODE(code, lino, _end); int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex); *pRpcHandle = transporterId; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b15cc2ab45..1ea2e72ca3 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1704,6 +1704,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, nodesDestroyList(groupNew); pAPI->metaReaderFn.clearReader(&mr); + return TSDB_CODE_SUCCESS; } @@ -3090,3 +3091,4 @@ _end: } return code; } + diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 07e8f31f60..d63664ec3a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -441,6 +441,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S } _end: + pAPI->metaReaderFn.clearReader(&mr); (*ppArrayRes) = qa; diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 37461382dd..98e390e443 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -102,7 +102,6 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand int32_t code = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api, pTaskInfo); if (*pTaskInfo == NULL || code != 0) { nodesDestroyNode((SNode*)pPlan); - taosMemoryFree(sql); return code; } @@ -112,7 +111,14 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand } } - TSWAP((*pTaskInfo)->sql, sql); + (*pTaskInfo)->sql = taosStrdup(sql); + if (NULL == (*pTaskInfo)->sql) { + code = terrno; + nodesDestroyNode((SNode*)pPlan); + doDestroyTask(*pTaskInfo); + (*pTaskInfo) = NULL; + return code; + } (*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pWorkerCb = pHandle->pWorkerCb; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 56934afc82..88f4a748e8 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1413,7 +1413,7 @@ static int32_t doSetUserTableMetaInfo(SStoreMetaReader* pMetaReaderFn, SStoreMet SMetaReader mr1 = {0}; pMetaReaderFn->initReader(&mr1, pVnode, META_READER_NOLOCK, pMetaFn); - + int64_t suid = pMReader->me.ctbEntry.suid; code = pMetaReaderFn->getTableEntryByUid(&mr1, suid); if (code != TSDB_CODE_SUCCESS) { @@ -1576,6 +1576,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn); + code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId, p, numOfRows, GET_TASKID(pTaskInfo)); @@ -1723,7 +1724,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SMetaReader mr = {0}; pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn); - + uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid); if (code != TSDB_CODE_SUCCESS) { @@ -2147,7 +2148,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca tstrncpy(pInfo->req.user, pInfo->pUser, tListLen(pInfo->req.user)); int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); - char* buf1 = taosMemCalloc(1, contLen); + char* buf1 = taosMemoryCalloc(1, contLen); if (!buf1) { return NULL; } @@ -2159,7 +2160,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca } // send the fetch remote task result reques - SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo)); + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; @@ -2177,10 +2178,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca pMsgSendInfo->fp = loadSysTableCallback; pMsgSendInfo->requestId = pTaskInfo->id.queryId; - void* poolHandle = NULL; - taosSaveDisableMemoryPoolUsage(poolHandle); code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, NULL, pMsgSendInfo); - taosRestoreEnableMemoryPoolUsage(poolHandle); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); pTaskInfo->code = code; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 39ca7434e4..7aa0baecd5 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -41,9 +41,6 @@ extern "C" { #define QW_THREAD_MAX_SCHED_TASK_NUM 10 #define QW_QUERY_MEM_POOL_NAME "Query" -#define QW_DEFAULT_RESERVE_MEM_PERCENT 20 -#define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576UL) -#define QW_MIN_MEM_POOL_SIZE (1048576UL) #define QW_MAX_RETIRE_JOB_NUM 10000 #define QW_DEFAULT_THREAD_TASK_NUM 3 @@ -252,9 +249,6 @@ typedef struct SQueryMgmt { SRWLatch taskMgmtLock; int32_t concTaskLevel; SHashObj* pJobInfo; - void* memPoolHandle; - int8_t memPoolInited; - SQWRetireCtx retireCtx; } SQueryMgmt; #define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST) diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index b8248d34f4..e15027cf9b 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -1,45 +1,6 @@ #include "qwInt.h" #include "qworker.h" -int32_t qwGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { - int64_t freeSize = 0; - int64_t usedSize = 0; - bool needEnd = false; - - taosMemPoolGetUsedSizeBegin(pHandle, &usedSize, &needEnd); - int32_t code = taosGetSysAvailMemory(&freeSize); - if (needEnd) { - taosMemPoolGetUsedSizeEnd(pHandle); - } - - if (TSDB_CODE_SUCCESS != code) { - qError("get system avaiable memory size failed, error: 0x%x", code); - return code; - } - - int64_t totalSize = freeSize + usedSize; - int64_t reserveSize = TMAX(tsTotalMemoryKB * 1024 * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE); - int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL; - if (availSize < QW_MIN_MEM_POOL_SIZE) { - qError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize); - return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM; - } - - uDebug("new pool maxSize:%" PRId64 ", usedSize:%" PRId64 ", freeSize:%" PRId64, availSize, usedSize, freeSize); - - *maxSize = availSize; - - return TSDB_CODE_SUCCESS; -} - -int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chunkSize) { - //TODO - - *chunkSize = 2 * 1048576; - - return TSDB_CODE_SUCCESS; -} - void qwSetConcurrentTaskNumCb(int32_t taskNum) { int32_t finTaskNum = TMIN(taskNum, tsNumOfQueryThreads * QW_DEFAULT_THREAD_TASK_NUM); @@ -90,9 +51,34 @@ int32_t qwInitJobInfo(uint64_t qId, SQWJobInfo* pJob) { return code; } + +int32_t qwInitJobHash(void) { + int32_t code = TSDB_CODE_SUCCESS; + SHashObj* pHash = NULL; + + if (NULL == atomic_load_ptr(&gQueryMgmt.pJobInfo)) { + pHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pHash) { + qError("init job hash failed, error:0x%x", terrno); + return terrno; + } + + if (NULL != atomic_val_compare_exchange_ptr(&gQueryMgmt.pJobInfo, NULL, pHash)) { + taosHashCleanup(pHash); + } + } + + return code; +} + + int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) { int32_t code = TSDB_CODE_SUCCESS; SQWJobInfo* pJob = NULL; + + if (NULL == gQueryMgmt.pJobInfo) { + QW_ERR_RET(qwInitJobHash()); + } while (true) { pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId)); @@ -146,202 +132,4 @@ _return: return code; } -void qwRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) { - SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, &mpJob->jobId, sizeof(mpJob->jobId)); - if (NULL == pJob) { - qError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId); - return; - } - - if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - qInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); - } else { - qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); - } -} - -void qwLowLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) { - SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); - while (pJob) { - if (!taosMemPoolNeedRetireJob(pHandle)) { - taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob); - return; - } - - uint64_t jobId = pJob->memInfo->jobId; - int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); - if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - qwRetireJob(pJob); - - qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, - jobId, aSize, retireSize); - - taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob); - break; - } - - pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); - } -} - -void qwMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) { - SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); - PriorityQueueNode qNode; - while (NULL != pJob) { - if (0 == atomic_load_8(&pJob->retired)) { - qNode.data = pJob; - (void)taosBQPush(gQueryMgmt.retireCtx.pJobQueue, &qNode); - } - - pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); - } - - PriorityQueueNode* pNode = NULL; - uint64_t jobId = 0; - int64_t retiredSize = 0; - while (retiredSize < retireSize) { - if (!taosMemPoolNeedRetireJob(pHandle)) { - break; - } - - pNode = taosBQTop(gQueryMgmt.retireCtx.pJobQueue); - if (NULL == pNode) { - break; - } - - pJob = (SQWJobInfo*)pNode->data; - if (atomic_load_8(&pJob->retired)) { - taosBQPop(gQueryMgmt.retireCtx.pJobQueue); - continue; - } - - if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); - jobId = pJob->memInfo->jobId; - - qwRetireJob(pJob); - - qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, - jobId, aSize, retireSize); - - retiredSize += aSize; - } - - taosBQPop(gQueryMgmt.retireCtx.pJobQueue); - } - - taosBQClear(gQueryMgmt.retireCtx.pJobQueue); -} - - -void qwRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int32_t errCode) { - (lowLevelRetire) ? qwLowLevelRetire(pHandle, retireSize, errCode) : qwMidLevelRetire(pHandle, retireSize, errCode); -} - -int32_t qwUpdateQueryMemPoolCfg(void* pHandle, int64_t* pFreeSize, bool* autoMaxSize, int64_t* pReserveSize, int64_t* pRetireUnitSize) { - if (tsQueryBufferPoolSize > 0) { - *pFreeSize = tsQueryBufferPoolSize * 1048576UL; - *autoMaxSize = false; - - return TSDB_CODE_SUCCESS; - } - - int32_t code = qwGetMemPoolMaxMemSize(pHandle, pFreeSize); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - *autoMaxSize = true; - - return code; -} - -void qwCheckUpateCfgCb(void* pHandle, void* cfg) { - SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg; - int64_t newJobQuota = tsSingleQueryMaxMemorySize * 1048576UL; - if (pCfg->jobQuota != newJobQuota) { - atomic_store_64(&pCfg->jobQuota, newJobQuota); - } - - int64_t freeSize = 0, reserveSize = 0, retireUnitSize = 0; - bool autoMaxSize = false; - int32_t code = qwUpdateQueryMemPoolCfg(pHandle, &freeSize, &autoMaxSize, &reserveSize, &retireUnitSize); - if (TSDB_CODE_SUCCESS != code) { - pCfg->freeSize = 0; - qError("get query memPool freeSize failed, reset freeSize to %" PRId64, pCfg->freeSize); - } - - if (pCfg->autoMaxSize != autoMaxSize || pCfg->freeSize != freeSize) { - pCfg->autoMaxSize = autoMaxSize; - atomic_store_64(&pCfg->freeSize, freeSize); - taosMemPoolCfgUpdate(pHandle, pCfg); - } -} - -static bool qwJobMemSizeCompFn(void* l, void* r, void* param) { - SQWJobInfo* left = (SQWJobInfo*)l; - SQWJobInfo* right = (SQWJobInfo*)r; - if (atomic_load_8(&right->retired)) { - return true; - } - - return atomic_load_64(&right->memInfo->allocMemSize) < atomic_load_64(&left->memInfo->allocMemSize); -} - -void qwDeleteJobQueueData(void* pData) {} - - -int32_t qwInitQueryPool(void) { - int32_t code = TSDB_CODE_SUCCESS; - -#ifdef LINUX - if (!tsQueryUseMemoryPool) { -#endif - qInfo("query memory pool disabled"); - return code; -#ifdef LINUX - } -#endif - - taosGetTotalMemory(&tsTotalMemoryKB); - - SMemPoolCfg cfg = {0}; - code = qwUpdateQueryMemPoolCfg(NULL, &cfg.freeSize, &cfg.autoMaxSize, &cfg.reserveSize, &cfg.retireUnitSize); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - cfg.threadNum = 10; //TODO - cfg.evicPolicy = E_EVICT_AUTO; //TODO - cfg.chunkSize = 1048576; - cfg.jobQuota = tsSingleQueryMaxMemorySize * 1048576UL; - cfg.cb.setSessFp = qwSetConcurrentTaskNumCb; - cfg.cb.decSessFp = qwDecConcurrentTaskNumCb; - cfg.cb.incSessFp = qwIncConcurrentTaskNumCb; - cfg.cb.retireJobsFp = qwRetireJobsCb; - cfg.cb.retireJobFp = qwRetireJobCb; - cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb; - - gQueryMgmt.pJobInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (NULL == gQueryMgmt.pJobInfo) { - qError("init job hash failed, error:0x%x", terrno); - return terrno; - } - - gQueryMgmt.retireCtx.pJobQueue = createBoundedQueue(QW_MAX_RETIRE_JOB_NUM, qwJobMemSizeCompFn, qwDeleteJobQueueData, NULL); - if (NULL == gQueryMgmt.retireCtx.pJobQueue) { - qError("init job bounded queue failed, error:0x%x", terrno); - return terrno; - } - - code = taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryMgmt.memPoolHandle); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - qInfo("query memory pool initialized"); - - return code; -} - diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 34cb734fb1..9fb659f197 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -321,7 +321,9 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { taosArrayDestroy(ctx->tbInfo); - taosMemPoolDestroySession(gQueryMgmt.memPoolHandle, ctx->memPoolSession); + if (gMemPoolHandle && ctx->memPoolSession) { + taosMemPoolDestroySession(gMemPoolHandle, ctx->memPoolSession); + } } static void freeExplainExecItem(void *param) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c38dbdcbbd..4bdd638849 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -767,7 +767,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ctx->sId = sId; ctx->phase = -1; - if (NULL != gQueryMgmt.memPoolHandle) { + if (NULL != gMemPoolHandle) { QW_ERR_JRET(qwInitSession(QW_FPARAMS(), ctx, &ctx->memPoolSession)); } @@ -803,7 +803,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { ctx->queryMsgType = qwMsg->msgType; ctx->localExec = false; + taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession); code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); + taosDisableMemoryPoolUsage(); + if (TSDB_CODE_SUCCESS != code) { code = TSDB_CODE_INVALID_MSG; QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code)); @@ -814,7 +817,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); taosDisableMemoryPoolUsage(); - sql = NULL; if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); qDestroyTask(pTaskInfo); @@ -1365,10 +1367,6 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - - if (0 == atomic_val_compare_exchange_8(&gQueryMgmt.memPoolInited, 0, 1)) { - QW_ERR_RET(qwInitQueryPool()); - } int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1); if (1 == qwNum) { @@ -1632,3 +1630,48 @@ _return: QW_RET(code); } + + +void qWorkerRetireJob(uint64_t jobId, int32_t errCode) { + SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, &jobId, sizeof(jobId)); + if (NULL == pJob) { + qError("QID:0x%" PRIx64 " fail to get job from job hash", jobId); + return; + } + + if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { + qInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); + + qwRetireJob(pJob); + } else { + qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); + } +} + +void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { + SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); + uint64_t jobId = 0; + int64_t retiredSize = 0; + while (retiredSize < retireSize && NULL != pJob) { + if (atomic_load_8(&pJob->retired)) { + pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); + continue; + } + + if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { + int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); + jobId = pJob->memInfo->jobId; + + qwRetireJob(pJob); + + retiredSize += aSize; + + qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, + jobId, aSize, retireSize); + } + + pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); + } +} + + diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 888b9cca98..b2c1beed2f 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -31,7 +31,9 @@ extern "C" { #define MP_MAX_KEEP_FREE_CHUNK_NUM 1000 #define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF -#define MP_DEFAULT_MEM_CHK_INTERVAL_MS 100 +#define MP_DEFAULT_MEM_CHK_INTERVAL_MS 10 +#define MP_MIN_MEM_CHK_INTERVAL_MS 1 + #define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95) #define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9) @@ -40,6 +42,10 @@ extern "C" { #define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL) #define MP_CFG_UPDATE_MIN_RESERVE_SIZE (50 * 1024 * 1048576UL) +#define MP_DEFAULT_RESERVE_MEM_PERCENT 20 +#define MP_MIN_FREE_SIZE_AFTER_RESERVE (4 * 1024 * 1048576UL) +#define MP_MIN_MEM_POOL_SIZE (5 * 1024 * 1048576UL) + // FLAGS AREA #define MP_CHUNK_FLAG_IN_USE (1 << 0) @@ -273,7 +279,7 @@ typedef struct SMemPool { int16_t slotId; SRWLatch cfgLock; SMemPoolCfg cfg; - int64_t retireThreshold[3]; + //int64_t retireThreshold[3]; int64_t retireUnit; SMPCtrlInfo ctrl; @@ -345,9 +351,6 @@ enum { #define MP_STAT_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail #define MP_STAT_ORIG_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail, (_item).origExec, (_item).origSucc, (_item).origFail -#define MP_API_ENTER() void* _pPoolHandle = NULL; taosSaveDisableMemoryPoolUsage(_pPoolHandle) -#define MP_API_LEAVE() taosRestoreEnableMemoryPoolUsage(_pPoolHandle) - #define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \ do { \ (_header)->size = _size; \ diff --git a/source/util/src/mpChunk.c b/source/util/src/mpChunk.c index 5b0b58441c..d50c118971 100755 --- a/source/util/src/mpChunk.c +++ b/source/util/src/mpChunk.c @@ -301,6 +301,7 @@ int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession) { } int32_t mpChunkUpdateCfg(SMemPool* pPool) { +/* pPool->chunk.maxChunkNum = pPool->cfg.freeSize / pPool->cfg.chunkSize; if (pPool->chunk.maxChunkNum <= 0) { uError("invalid memory pool max chunk num, freeSize:%" PRId64 ", chunkSize:%d", pPool->cfg.freeSize, pPool->cfg.chunkSize); @@ -310,7 +311,7 @@ int32_t mpChunkUpdateCfg(SMemPool* pPool) { pPool->chunk.readyChunkReserveNum = TMIN(pPool->cfg.threadNum * pPool->chunk.threadChunkReserveNum, pPool->chunk.maxChunkNum); pPool->chunk.chunkCache.groupNum = TMAX(pPool->chunk.maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE); - +*/ return TSDB_CODE_SUCCESS; } diff --git a/source/util/src/mpDirect.c b/source/util/src/mpDirect.c index 5b47d7b01a..2de4d71c56 100755 --- a/source/util/src/mpDirect.c +++ b/source/util/src/mpDirect.c @@ -33,7 +33,10 @@ int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint nSize = taosMemSize(res); mpUpdateAllocSize(pPool, pSession, nSize, nSize - *size); } else { - (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, *size); + if (NULL != pSession) { + (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, *size); + } + (void)atomic_sub_fetch_64(&pPool->allocMemSize, *size); uError("malloc %" PRId64 " alignment %d failed, code: 0x%x", *size, alignment, terrno); @@ -64,9 +67,12 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori taosRLockLatch(&pPool->cfgLock); // tmp test taosMemFree(ptr); + + if (NULL != pSession) { + (void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize); + (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize); + } - (void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize); - (void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize); (void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize); taosRUnLockLatch(&pPool->cfgLock); diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 0199e2c2b8..c1b034042b 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -19,9 +19,10 @@ #include "tlog.h" #include "tutil.h" #include "taos.h" +#include "tglobal.h" static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT; -threadlocal void* threadPoolHandle = NULL; +void* gMemPoolHandle = NULL; threadlocal void* threadPoolSession = NULL; SMemPoolMgmt gMPMgmt = {0}; SMPStrategyFp gMPFps[] = { @@ -64,7 +65,7 @@ void mpFreeCacheGroup(SMPCacheGroup* pGrp) { int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead) { SMPCacheGroup* pGrp = NULL; if (NULL == pInfo->pGrpHead) { - pInfo->pGrpHead = taosMemCalloc(1, sizeof(*pInfo->pGrpHead)); + pInfo->pGrpHead = taosMemoryCalloc(1, sizeof(*pInfo->pGrpHead)); if (NULL == pInfo->pGrpHead) { uError("malloc chunkCache failed"); MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -72,12 +73,12 @@ int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup pGrp = pInfo->pGrpHead; } else { - pGrp = (SMPCacheGroup*)taosMemCalloc(1, sizeof(SMPCacheGroup)); + pGrp = (SMPCacheGroup*)taosMemoryCalloc(1, sizeof(SMPCacheGroup)); pGrp->pNext = pHead; } pGrp->nodesNum = pInfo->groupNum; - pGrp->pNodes = taosMemCalloc(pGrp->nodesNum, pInfo->nodeSize); + pGrp->pNodes = taosMemoryCalloc(pGrp->nodesNum, pInfo->nodeSize); if (NULL == pGrp->pNodes) { uError("calloc %d %d nodes in cache group failed", pGrp->nodesNum, pInfo->nodeSize); MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -162,19 +163,12 @@ void mpPushIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPListNode* pNod int32_t mpUpdateCfg(SMemPool* pPool) { - atomic_store_64(&pPool->retireThreshold[0], pPool->cfg.freeSize * MP_RETIRE_LOW_THRESHOLD_PERCENT); - atomic_store_64(&pPool->retireThreshold[1], pPool->cfg.freeSize * MP_RETIRE_MID_THRESHOLD_PERCENT); - atomic_store_64(&pPool->retireThreshold[2], pPool->cfg.freeSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT); - if (gMPFps[gMPMgmt.strategy].updateCfgFp) { MP_ERR_RET((*gMPFps[gMPMgmt.strategy].updateCfgFp)(pPool)); } - uDebug("memPool %s cfg updated, autoMaxSize:%d, freeSize:%" PRId64 - ", jobQuota:%" PRId64 ", threadNum:%d, retireThreshold:%" PRId64 "-%" PRId64 "-%" PRId64 - ", retireUnit:%" PRId64, pPool->name, pPool->cfg.autoMaxSize, pPool->cfg.freeSize, - pPool->cfg.jobQuota, pPool->cfg.threadNum, pPool->retireThreshold[0], pPool->retireThreshold[1], - pPool->retireThreshold[2], pPool->cfg.retireUnitSize); + uDebug("memPool %s cfg updated, reserveSize:%dMB, jobQuota:%dMB, threadNum:%d", + pPool->name, pPool->cfg.reserveSize, pPool->cfg.jobQuota, pPool->cfg.threadNum); return TSDB_CODE_SUCCESS; } @@ -261,17 +255,21 @@ FORCE_INLINE void mpUpdateMaxAllocSize(int64_t* pMaxAllocMemSize, int64_t newSiz void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize) { if (addSize) { - atomic_add_fetch_64(&pSession->pJob->job.allocMemSize, addSize); + if (NULL != pSession) { + atomic_add_fetch_64(&pSession->pJob->job.allocMemSize, addSize); + } atomic_add_fetch_64(&pPool->allocMemSize, addSize); } + + if (NULL != pSession) { + int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size); + mpUpdateMaxAllocSize(&pSession->maxAllocMemSize, allocMemSize); + + allocMemSize = atomic_load_64(&pSession->pJob->job.allocMemSize); + mpUpdateMaxAllocSize(&pSession->pJob->job.maxAllocMemSize, allocMemSize); + } - int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size); - mpUpdateMaxAllocSize(&pSession->maxAllocMemSize, allocMemSize); - - allocMemSize = atomic_load_64(&pSession->pJob->job.allocMemSize); - mpUpdateMaxAllocSize(&pSession->pJob->job.maxAllocMemSize, allocMemSize); - - allocMemSize = atomic_load_64(&pPool->allocMemSize); + int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize); mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize); } @@ -295,62 +293,45 @@ int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) { - SMPJob* pJob = pSession->pJob; int32_t code = TSDB_CODE_SUCCESS; + if (NULL == pSession) { + (void)atomic_add_fetch_64(&pPool->allocMemSize, size); + return code; + } + SMPJob* pJob = pSession->pJob; int64_t cAllocSize = atomic_add_fetch_64(&pJob->job.allocMemSize, size); - int64_t quota = atomic_load_64(&pPool->cfg.jobQuota); - if (quota > 0 && cAllocSize > quota) { + int64_t quota = atomic_load_32(pPool->cfg.jobQuota); + if (quota > 0 && cAllocSize / 1048576UL > quota) { code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota); - pPool->cfg.cb.retireJobFp(&pJob->job, code); + pPool->cfg.cb.reachFp(pJob->job.jobId, code); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); MP_RET(code); } - int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size); - quota = atomic_load_64(&pPool->retireThreshold[2]); - if (pAllocSize >= quota) { + if (atomic_load_64(&tsCurrentAvailMemorySize) <= ((atomic_load_32(pPool->cfg.reserveSize) * 1048576UL) + size)) { code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; - uWarn("%s pool allocSize %" PRId64 " reaches the high quota %" PRId64, pPool->name, pAllocSize, quota); + uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %dMB", + pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize); + pPool->cfg.cb.reachFp(pJob->job.jobId, code); + (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); + MP_RET(code); + } + + (void)atomic_add_fetch_64(&pPool->allocMemSize, size); + +/* + int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size); + if (pAllocSize >= atomic_load_32(pPool->cfg.upperLimitSize) * 1048576UL) { + code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; + uWarn("%s pool allocSize %" PRId64 " reaches the upperLimit %" PRId64, pPool->name, pAllocSize, atomic_load_32(pPool->cfg.upperLimitSize) * 1048576UL); pPool->cfg.cb.retireJobFp(&pJob->job, code); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); MP_RET(code); } - - quota = atomic_load_64(&pPool->retireThreshold[1]); - if (pAllocSize >= quota) { - uInfo("%s pool allocSize %" PRId64 " reaches the middle quota %" PRId64, pPool->name, pAllocSize, quota); - if (cAllocSize >= atomic_load_64(&pPool->cfg.retireUnitSize) / 2) { - code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; - pPool->cfg.cb.retireJobFp(&pJob->job, code); - (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); - (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); - - MP_RET(code); - } else { - MP_ERR_RET(mpPutRetireMsgToQueue(pPool, false)); - } - - return TSDB_CODE_SUCCESS; - } - - quota = atomic_load_64(&pPool->retireThreshold[0]); - if (pAllocSize >= quota) { - uInfo("%s pool allocSize %" PRId64 " reaches the low quota %" PRId64, pPool->name, pAllocSize, quota); - if (cAllocSize >= atomic_load_64(&pPool->cfg.retireUnitSize)) { - code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; - pPool->cfg.cb.retireJobFp(&pJob->job, code); - - (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); - (void)atomic_sub_fetch_64(&pPool->allocMemSize, size); - - MP_RET(code); - } else { - MP_ERR_RET(mpPutRetireMsgToQueue(pPool, true)); - } - } +*/ return TSDB_CODE_SUCCESS; } @@ -438,8 +419,8 @@ void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailN uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Malloc", pDetail->times.memMalloc)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->times.memCalloc)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Realloc", pDetail->times.memRealloc)); - uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->times.strdup)); - uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->times.strndup)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->times.memStrdup)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->times.memStrndup)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->times.memFree)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->times.memTrim)); break; @@ -459,8 +440,8 @@ void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailN uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Malloc", pDetail->bytes.memMalloc)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->bytes.memCalloc)); uInfo(MP_STAT_ORIG_FORMAT, MP_STAT_ORIG_VALUE("Realloc", pDetail->bytes.memRealloc)); - uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->bytes.strdup)); - uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->bytes.strndup)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->bytes.memStrdup)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->bytes.memStrndup)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->bytes.memFree)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->bytes.memTrim)); break; @@ -656,31 +637,31 @@ void mpLogDetailStat(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* } case E_MP_STAT_LOG_MEM_STRDUP: { if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { - atomic_add_fetch_64(&pDetail->times.strdup.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.strdup.exec, pInput->size); + atomic_add_fetch_64(&pDetail->times.memStrdup.exec, 1); + atomic_add_fetch_64(&pDetail->bytes.memStrdup.exec, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { - atomic_add_fetch_64(&pDetail->times.strdup.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.strdup.succ, pInput->size); + atomic_add_fetch_64(&pDetail->times.memStrdup.succ, 1); + atomic_add_fetch_64(&pDetail->bytes.memStrdup.succ, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { - atomic_add_fetch_64(&pDetail->times.strdup.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.strdup.fail, pInput->size); + atomic_add_fetch_64(&pDetail->times.memStrdup.fail, 1); + atomic_add_fetch_64(&pDetail->bytes.memStrdup.fail, pInput->size); } break; } case E_MP_STAT_LOG_MEM_STRNDUP: { if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { - atomic_add_fetch_64(&pDetail->times.strndup.exec, 1); - atomic_add_fetch_64(&pDetail->bytes.strndup.exec, pInput->size); + atomic_add_fetch_64(&pDetail->times.memStrndup.exec, 1); + atomic_add_fetch_64(&pDetail->bytes.memStrndup.exec, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { - atomic_add_fetch_64(&pDetail->times.strndup.succ, 1); - atomic_add_fetch_64(&pDetail->bytes.strndup.succ, pInput->size); + atomic_add_fetch_64(&pDetail->times.memStrndup.succ, 1); + atomic_add_fetch_64(&pDetail->bytes.memStrndup.succ, pInput->size); } if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { - atomic_add_fetch_64(&pDetail->times.strndup.fail, 1); - atomic_add_fetch_64(&pDetail->bytes.strndup.fail, pInput->size); + atomic_add_fetch_64(&pDetail->times.memStrndup.fail, 1); + atomic_add_fetch_64(&pDetail->bytes.memStrndup.fail, pInput->size); } break; } @@ -890,14 +871,16 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt case E_MP_STAT_LOG_MEM_STRDUP: case E_MP_STAT_LOG_MEM_STRNDUP: case E_MP_STAT_LOG_MEM_TRIM: { - if (MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) { + if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) { mpLogDetailStat(&pSession->stat.statDetail, item, pInput); } if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) { mpLogDetailStat(&pPool->stat.statDetail, item, pInput); } - if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { + if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { mpLogPosStat(&pSession->stat.posStat, item, pInput, true); + } + if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { mpLogPosStat(&pPool->stat.posStat, item, pInput, false); } break; @@ -930,7 +913,8 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) { if (allocSize != freeSize) { uError("%s Session in JOB:0x%" PRIx64 " stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pSession->pJob->job.jobId, allocSize, freeSize); - ASSERT(0); + + taosMemPoolPrintStat(NULL, pSession, detailName); } else { uDebug("%s Session in JOB:0x%" PRIx64 " stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pSession->pJob->job.jobId, allocSize, freeSize); @@ -943,13 +927,13 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) { pDetail = &pPool->stat.statDetail; int64_t sessInit = atomic_load_64(&pPool->stat.statSession.initFail) + atomic_load_64(&pPool->stat.statSession.initSucc); if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == atomic_load_64(&pPool->stat.statSession.destroyNum)) { - int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ; + int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.memStrdup.succ + pDetail->bytes.memStrndup.succ; int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ; if (allocSize != freeSize) { uError("%s MemPool %s stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize); - ASSERT(0); + taosMemPoolPrintStat(poolHandle, NULL, detailName); } else { uDebug("%s MemPool %s stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize); } @@ -959,6 +943,7 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) { void mpCheckUpateCfg(void) { +/* taosRLockLatch(&gMPMgmt.poolLock); int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList); for (int32_t i = 0; i < poolNum; ++i) { @@ -968,14 +953,44 @@ void mpCheckUpateCfg(void) { } } taosRUnLockLatch(&gMPMgmt.poolLock); +*/ +} + +void mpUpdateSystemAvailableMemorySize() { + static int64_t errorTimes = 0; + int64_t sysAvailSize = 0; + + int32_t code = taosGetSysAvailMemory(&sysAvailSize); + if (TSDB_CODE_SUCCESS != code) { + errorTimes++; + if (0 == errorTimes % 1000) { + uError("get system available memory size failed, error: %s, errorTimes:%" PRId64, tstrerror(code), errorTimes); + } + + return; + } + + atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize); } void* mpMgmtThreadFunc(void* param) { int32_t timeout = 0; + int64_t retireSize = 0; + SMemPool* pPool = (SMemPool*)gMemPoolHandle; + while (0 == atomic_load_8(&gMPMgmt.modExit)) { + mpUpdateSystemAvailableMemorySize(); + + retireSize = atomic_load_32(pPool->cfg.reserveSize) * 1048576UL - tsCurrentAvailMemorySize; + if (retireSize > 0) { + (*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); + } + + taosMsleep(MP_DEFAULT_MEM_CHK_INTERVAL_MS); +/* timeout = tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs); if (0 != timeout) { - mpCheckUpateCfg(); + mpUpdateSystemAvailableMemorySize(); continue; } @@ -984,13 +999,31 @@ void* mpMgmtThreadFunc(void* param) { } else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) { (*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); } - - mpCheckUpateCfg(); +*/ } return NULL; } +int32_t mpCreateMgmtThread() { + int32_t code = TSDB_CODE_SUCCESS; + TdThreadAttr thAttr; + MP_ERR_RET(taosThreadAttrInit(&thAttr)); + MP_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); + code = taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, mpMgmtThreadFunc, NULL); + if (code != 0) { + uError("failed to create memPool mgmt thread, error: 0x%x", code); + (void)taosThreadAttrDestroy(&thAttr); + MP_ERR_JRET(code); + } + +_return: + + MP_ERR_RET(taosThreadAttrDestroy(&thAttr)); + + return code; +} + void mpModInit(void) { int32_t code = TSDB_CODE_SUCCESS; @@ -1010,18 +1043,6 @@ void mpModInit(void) { } gMPMgmt.waitMs = MP_DEFAULT_MEM_CHK_INTERVAL_MS; - - TdThreadAttr thAttr; - MP_ERR_JRET(taosThreadAttrInit(&thAttr)); - MP_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE)); - code = taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, mpMgmtThreadFunc, NULL); - if (code != 0) { - uError("failed to create memPool mgmt thread, error: 0x%x", code); - (void)taosThreadAttrDestroy(&thAttr); - MP_ERR_JRET(code); - } - - MP_ERR_JRET(taosThreadAttrDestroy(&thAttr)); _return: @@ -1029,8 +1050,6 @@ _return: } void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { - MP_API_ENTER(); - SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; char detailName[128]; @@ -1045,26 +1064,24 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { mpPrintPosStat(&pSession->ctrl, &pSession->stat.posStat, detailName); } - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name); - detailName[sizeof(detailName) - 1] = 0; - mpPrintSessionStat(&pPool->ctrl, &pPool->stat.statSession, detailName); - mpPrintStatDetail(&pPool->ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize); + if (NULL != pPool) { + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name); + detailName[sizeof(detailName) - 1] = 0; + mpPrintSessionStat(&pPool->ctrl, &pPool->stat.statSession, detailName); + mpPrintStatDetail(&pPool->ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize); - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); - detailName[sizeof(detailName) - 1] = 0; - mpPrintNodeStat(&pPool->ctrl, pPool->stat.nodeStat, detailName); - - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos"); - detailName[sizeof(detailName) - 1] = 0; - mpPrintPosStat(&pPool->ctrl, &pPool->stat.posStat, detailName); - - MP_API_LEAVE(); + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); + detailName[sizeof(detailName) - 1] = 0; + mpPrintNodeStat(&pPool->ctrl, pPool->stat.nodeStat, detailName); + + snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos"); + detailName[sizeof(detailName) - 1] = 0; + mpPrintPosStat(&pPool->ctrl, &pPool->stat.posStat, detailName); + } } int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = NULL; @@ -1104,29 +1121,21 @@ _return: *poolHandle = pPool; - MP_API_LEAVE(); - return code; } void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) { - MP_API_ENTER(); - SMemPool* pPool = (SMemPool*)poolHandle; (void)mpUpdateCfg(pPool); - - MP_API_LEAVE(); } void taosMemPoolDestroySession(void* poolHandle, void* session) { - MP_API_ENTER(); - SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - if (NULL == pSession) { - uWarn("null pointer of session"); - goto _return; + if (NULL == poolHandle || NULL == pSession) { + uWarn("null pointer of poolHandle %p or session %p", poolHandle, session); + return; } (void)atomic_sub_fetch_32(&pSession->pJob->remainSession, 1); @@ -1135,22 +1144,14 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) { (void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1); - taosMemPoolPrintStat(pPool, pSession, "DestroySession"); - mpCheckStatDetail(pPool, pSession, "DestroySession"); TAOS_MEMSET(pSession, 0, sizeof(*pSession)); mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession); - -_return: - - MP_API_LEAVE(); } int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = NULL; @@ -1180,44 +1181,42 @@ _return: *ppSession = pSession; - MP_API_LEAVE(); - return code; } void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { + if (NULL == poolHandle || NULL == fileName || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - terrno = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem); + code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem); MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); _return: - MP_API_LEAVE(); + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + } return input.pMem; } void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; + int64_t totalSize = num * size; + SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) { + if (NULL == poolHandle || NULL == fileName || num < 0 || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, num, size); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); @@ -1225,27 +1224,26 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - int64_t totalSize = num * size; - SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - terrno = mpCalloc(pPool, pSession, &input.size, &input.pMem); + code = mpCalloc(pPool, pSession, &input.size, &input.pMem); MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input); _return: - MP_API_LEAVE(); + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + } return input.pMem; } void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0, .pMem = ptr, .pOrigMem = ptr}; - if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { + if (NULL == poolHandle || NULL == fileName || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); @@ -1253,9 +1251,8 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0, .pMem = ptr, .pOrigMem = ptr}; - terrno = mpRealloc(pPool, pSession, &input.pMem, &input.size, &input.origSize); + code = mpRealloc(pPool, pSession, &input.pMem, &input.size, &input.origSize); if (NULL != input.pMem) { MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); @@ -1278,17 +1275,19 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz _return: - MP_API_LEAVE(); + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + } return input.pMem; } char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; + int64_t size = (ptr ? strlen(ptr) : 0) + 1; + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) { + if (NULL == poolHandle || NULL == fileName || NULL == ptr) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p", __FUNCTION__, poolHandle, session, fileName, ptr); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); @@ -1296,10 +1295,8 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - int64_t size = strlen(ptr) + 1; - SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - terrno = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem); + code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem); if (NULL != input.pMem) { TAOS_STRCPY(input.pMem, ptr); *((char*)input.pMem + size - 1) = 0; @@ -1310,17 +1307,20 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* _return: - MP_API_LEAVE(); + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + } return input.pMem; } char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; + int64_t origSize = ptr ? strlen(ptr) : 0; + size = TMIN(size, origSize) + 1; + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr || size < 0) { + if (NULL == poolHandle || NULL == fileName || NULL == ptr || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, ptr, size); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); @@ -1328,11 +1328,8 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64 SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - int64_t origSize = strlen(ptr); - size = TMIN(size, origSize) + 1; - SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - terrno = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem); + code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem); if (NULL != input.pMem) { TAOS_MEMCPY(input.pMem, ptr, size - 1); *((char*)input.pMem + size - 1) = 0; @@ -1343,24 +1340,23 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64 _return: - MP_API_LEAVE(); + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + } return input.pMem; } void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { - MP_API_ENTER(); - if (NULL == ptr) { - goto _return; + return; } int32_t code = TSDB_CODE_SUCCESS; - if (NULL == poolHandle || NULL == session || NULL == fileName) { + if (NULL == poolHandle || NULL == fileName) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p", __FUNCTION__, poolHandle, session, fileName); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1371,45 +1367,31 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input); - -_return: - - MP_API_LEAVE(); - - return; } int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { - MP_API_ENTER(); - int64_t code = 0; - if (NULL == poolHandle || NULL == session || NULL == fileName) { + if (NULL == poolHandle || NULL == fileName) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p", __FUNCTION__, poolHandle, session, fileName); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } if (NULL == ptr) { - goto _return; + return 0; } SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; + return mpGetMemorySizeImpl(pPool, pSession, ptr); - -_return: - - MP_API_LEAVE(); - - return code; } void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) { + if (NULL == poolHandle || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, alignment, size); MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); @@ -1417,51 +1399,42 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; - SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL}; - terrno = mpMalloc(pPool, pSession, &input.size, alignment, &input.pMem); + code = mpMalloc(pPool, pSession, &input.size, alignment, &input.pMem); MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input); _return: - MP_API_LEAVE(); + if (TSDB_CODE_SUCCESS != code) { + terrno = code; + } return input.pMem; } void taosMemPoolClose(void* poolHandle) { - MP_API_ENTER(); - SMemPool* pPool = (SMemPool*)poolHandle; - taosMemPoolPrintStat(poolHandle, NULL, "PoolClose"); - mpCheckStatDetail(pPool, NULL, "PoolClose"); taosMemoryFree(pPool->name); mpDestroyCacheGroup(&pPool->sessionCache); - - MP_API_LEAVE(); } void taosMemPoolModDestroy(void) { - MP_API_ENTER(); - - MP_API_LEAVE(); + } int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; - if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { + if (NULL == poolHandle || NULL == fileName || size < 0) { uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%d", __FUNCTION__, poolHandle, session, fileName, size); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } SMemPool* pPool = (SMemPool*)poolHandle; @@ -1475,89 +1448,51 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil MP_SET_FLAG(input.procFlags, ((0 == code) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_TRIM, &input); -_return: - - MP_API_LEAVE(); - return code; } int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; *ppJob = taosMemoryCalloc(1, sizeof(SMPJob)); if (NULL == *ppJob) { uError("calloc mp job failed, code: 0x%x", terrno); - MP_ERR_JRET(terrno); + MP_ERR_RET(terrno); } SMPJob* pJob = (SMPJob*)*ppJob; pJob->job.jobId = jobId; -_return: - - MP_API_LEAVE(); - return code; } void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) { - MP_API_ENTER(); - if (NULL == poolHandle) { *usedSize = 0; *needEnd = false; - goto _return; + return; } SMemPool* pPool = (SMemPool*)poolHandle; -#if 0 - if ((atomic_load_64(&pPool->cfg.maxSize) - atomic_load_64(&pPool->allocMemSize)) <= MP_CFG_UPDATE_MIN_RESERVE_SIZE) { - *needEnd = true; - taosWLockLatch(&pPool->cfgLock); - } else { - *needEnd = false; - } - *usedSize = atomic_load_64(&pPool->allocMemSize); -#else taosWLockLatch(&pPool->cfgLock); *needEnd = true; *usedSize = atomic_load_64(&pPool->allocMemSize); -#endif - -_return: - - MP_API_LEAVE(); } void taosMemPoolGetUsedSizeEnd(void* poolHandle) { - MP_API_ENTER(); - SMemPool* pPool = (SMemPool*)poolHandle; + if (NULL == pPool) { + return; + } + taosWUnLockLatch(&pPool->cfgLock); - - MP_API_LEAVE(); -} - -bool taosMemPoolNeedRetireJob(void* poolHandle) { - MP_API_ENTER(); - - SMemPool* pPool = (SMemPool*)poolHandle; - - MP_API_LEAVE(); - - return atomic_load_64(&pPool->allocMemSize) >= atomic_load_64(&pPool->retireThreshold[0]); } int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) { - MP_API_ENTER(); - int32_t code = TSDB_CODE_SUCCESS; if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) { uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize); - MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM); } SMPSession* pSession = (SMPSession*)session; @@ -1571,17 +1506,77 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t if (maxAllocSize) { *maxAllocSize = atomic_load_64(&pSession->maxAllocMemSize); } - -_return: - - MP_API_LEAVE(); return code; } +int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) { + int32_t code = TSDB_CODE_SUCCESS; + +#ifdef LINUX + if (!tsQueryUseMemoryPool) { +#endif + uInfo("memory pool disabled cause of configured disabled"); + return code; +#ifdef LINUX + } +#endif + + taosGetTotalMemory(&tsTotalMemoryKB); + if (tsTotalMemoryKB <= 0) { + uInfo("memory pool disabled since no enough system total memory, size: %" PRId64 "KB", tsTotalMemoryKB); + return code; + } + + uInfo("total memory size: %" PRId64 "KB", tsTotalMemoryKB); + + tsMinReservedMemorySize = TMAX(MIN_RESERVE_MEM_SIZE, tsTotalMemoryKB / 1024 * MP_DEFAULT_RESERVE_MEM_PERCENT / 100); + + SMemPoolCfg cfg = {0}; + int64_t sysAvailSize = 0; + + code = taosGetSysAvailMemory(&sysAvailSize); + if (code || sysAvailSize < MP_MIN_MEM_POOL_SIZE) { + uInfo("memory pool disabled since no enough system available memory, size: %" PRId64, sysAvailSize); + code = TSDB_CODE_SUCCESS; + return code; + } + + cfg.reserveSize = &tsMinReservedMemorySize; + + int64_t freeSizeAfterRes = sysAvailSize - tsMinReservedMemorySize * 1048576UL; + if (freeSizeAfterRes < MP_MIN_FREE_SIZE_AFTER_RESERVE) { + uInfo("memory pool disabled since no enough system available memory after reservied, size: %" PRId64, freeSizeAfterRes); + return code; + } + + atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize); + + cfg.evicPolicy = E_EVICT_AUTO; //TODO + cfg.chunkSize = 1048576; + cfg.jobQuota = &tsSingleQueryMaxMemorySize; + cfg.cb.failFp = failFp; + cfg.cb.reachFp = reachFp; + + code = taosMemPoolOpen("taosd", &cfg, &gMemPoolHandle); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = mpCreateMgmtThread(); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + uInfo("memory pool initialized"); + + return code; +} + + void taosAutoMemoryFree(void *ptr) { - if (NULL != threadPoolHandle) { - taosMemPoolFree(threadPoolHandle, threadPoolSession, ptr, __FILE__, __LINE__); + if (NULL != gMemPoolHandle) { + taosMemPoolFree(gMemPoolHandle, threadPoolSession, ptr, __FILE__, __LINE__); } else { taosMemFree(ptr); } diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 956ec06b6a..dc22c4ee11 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -156,8 +156,8 @@ typedef struct { typedef struct { int64_t jobQuota; - bool autoPoolSize; - int64_t poolSize; + bool reserveMode; + int64_t upperLimitSize; int32_t threadNum; int32_t randTask; } SMPTestParam; @@ -219,7 +219,7 @@ void mptInitLogFile() { tsLogKeepDays = 10; TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH); - if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) { + if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum, false) < 0) { MPT_PRINTF("failed to open log file in directory:%s\n", tsLogDir); } } @@ -254,7 +254,7 @@ void mptInit() { mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, mptDeleteJobQueueData, NULL); ASSERT_TRUE(NULL != mptCtx.pJobQueue); - mptCtx.jobCtxs = (SMPTestJobCtx*)taosMemCalloc(MPT_MAX_JOB_NUM, sizeof(*mptCtx.jobCtxs)); + mptCtx.jobCtxs = (SMPTestJobCtx*)taosMemoryCalloc(MPT_MAX_JOB_NUM, sizeof(*mptCtx.jobCtxs)); ASSERT_TRUE(NULL != mptCtx.jobCtxs); mptCtrl.pSrcString = (char*)taosMemoryMalloc(mptCtrl.maxSingleAllocSize); @@ -370,10 +370,10 @@ void mptInitTask(int32_t idx, int32_t eId, SMPTestJobCtx* pJob) { ASSERT_TRUE(0 == mptInitSession(pJob->jobId, pJob->taskCtxs[idx].taskId, eId, pJob, &pJob->pSessions[idx])); - pJob->taskCtxs[idx].pMemList = (SMPTestMemInfo*)taosMemCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].pMemList)); + pJob->taskCtxs[idx].pMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].pMemList)); ASSERT_TRUE(NULL != pJob->taskCtxs[idx].pMemList); - pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList)); + pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList)); ASSERT_TRUE(NULL != pJob->taskCtxs[idx].npMemList); uDebug("JOB:0x%x TASK:0x%x idx:%d initialized", pJob->jobId, pJob->taskCtxs[idx].taskId, idx); @@ -487,108 +487,19 @@ int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { return TSDB_CODE_SUCCESS; } -int32_t mptGetQueryMemPoolMaxSize(void* pHandle, int64_t* pMaxSize, bool* autoMaxSize) { - if (!mptCtx.param.autoPoolSize && mptCtx.param.poolSize > 0) { - *pMaxSize = mptCtx.param.poolSize * 1048576UL; - *autoMaxSize = false; - - return TSDB_CODE_SUCCESS; - } - - int32_t code = mptGetMemPoolMaxMemSize(pHandle, pMaxSize); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - *autoMaxSize = true; - - return code; -} - - -void mptCheckUpateCfgCb(void* pHandle, void* cfg) { - SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg; - int64_t newJobQuota = mptCtx.param.jobQuota * 1048576UL; - if (pCfg->jobQuota != newJobQuota) { - atomic_store_64(&pCfg->jobQuota, newJobQuota); - } - - int64_t freeSize = 0; - bool autoMaxSize = false; - int32_t code = mptGetQueryMemPoolMaxSize(pHandle, &freeSize, &autoMaxSize); - if (TSDB_CODE_SUCCESS != code) { - pCfg->freeSize = 0; - uError("get query memPool freeSize failed, reset freeSize to %" PRId64, pCfg->freeSize); - return; - } - - if (pCfg->autoMaxSize != autoMaxSize || pCfg->freeSize != freeSize) { - pCfg->autoMaxSize = autoMaxSize; - atomic_store_64(&pCfg->freeSize, freeSize); - taosMemPoolCfgUpdate(pHandle, pCfg); - } -} - -void mptLowLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) { +void mptRetireJobsCb(void* pHandle, int64_t retireSize, int32_t errCode) { SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); - while (pJob) { - if (!taosMemPoolNeedRetireJob(pHandle)) { - taosHashCancelIterate(mptCtx.pJobs, pJob); - return; - } - - uint64_t jobId = pJob->memInfo->jobId; - int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); - if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - mptRetireJob(pJob); - - uDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, - jobId, aSize, retireSize); - - taosHashCancelIterate(mptCtx.pJobs, pJob); - break; - } - - pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob); - } -} - -void mptMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) { - SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); - PriorityQueueNode qNode; - while (NULL != pJob) { - if (0 == atomic_load_8(&pJob->retired)) { - assert(pJob == taosHashAcquire(mptCtx.pJobs, &pJob->memInfo->jobId, sizeof(pJob->memInfo->jobId))); - qNode.data = pJob; - if (NULL == taosBQPush(mptCtx.pJobQueue, &qNode)) { - taosHashRelease(mptCtx.pJobs, pJob); - } - } - - pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob); - } - - PriorityQueueNode* pNode = NULL; + uint64_t jobId = 0; int64_t retiredSize = 0; - while (retiredSize < retireSize) { - if (!taosMemPoolNeedRetireJob(pHandle)) { - break; - } - - pNode = taosBQTop(mptCtx.pJobQueue); - if (NULL == pNode) { - break; - } - - pJob = (SMPTJobInfo*)pNode->data; + while (retiredSize < retireSize && NULL != pJob) { if (atomic_load_8(&pJob->retired)) { - taosBQPop(mptCtx.pJobQueue); + pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); continue; } if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); - uint64_t jobId = pJob->memInfo->jobId; + jobId = pJob->memInfo->jobId; mptRetireJob(pJob); @@ -598,15 +509,8 @@ void mptMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) { jobId, aSize, retireSize, retiredSize); } - taosBQPop(mptCtx.pJobQueue); + pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); } - - taosBQClear(mptCtx.pJobQueue); -} - - -void mptRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int32_t errCode) { - (lowLevelRetire) ? mptLowLevelRetire(pHandle, retireSize, errCode) : mptMidLevelRetire(pHandle, retireSize, errCode); } @@ -627,13 +531,13 @@ void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) { void mptInitPool(void) { SMemPoolCfg cfg = {0}; - cfg.autoMaxSize = mptCtx.param.autoPoolSize; - if (!mptCtx.param.autoPoolSize) { - cfg.freeSize = mptCtx.param.poolSize; + cfg.reserveMode = mptCtx.param.reserveMode; + if (!mptCtx.param.reserveMode) { + cfg.upperLimitSize = mptCtx.param.upperLimitSize; } else { int64_t memSize = 0; ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize)); - cfg.freeSize = memSize * 0.8; + cfg.reserveSize = memSize / 1048576UL * MP_DEFAULT_RESERVE_MEM_PERCENT / 100; } cfg.threadNum = 10; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO @@ -641,7 +545,6 @@ void mptInitPool(void) { cfg.jobQuota = mptCtx.param.jobQuota; cfg.cb.retireJobsFp = mptRetireJobsCb; cfg.cb.retireJobFp = mptRetireJobCb; - cfg.cb.cfgUpdateFp = mptCheckUpateCfgCb; ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle)); } @@ -819,20 +722,20 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { mptCtrl.pSrcString[size] = 'W'; if (NULL == pTask->pMemList[pTask->memIdx].p) { - pTask->stat.times.strdup.exec++; - pTask->stat.bytes.strdup.exec+=size + 1; - pTask->stat.times.strdup.fail++; - pTask->stat.bytes.strdup.fail+=size + 1; + pTask->stat.times.memStrdup.exec++; + pTask->stat.bytes.memStrdup.exec+=size + 1; + pTask->stat.times.memStrdup.fail++; + pTask->stat.bytes.memStrdup.fail+=size + 1; uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p); - pTask->stat.times.strdup.exec++; - pTask->stat.bytes.strdup.exec+= nsize; + pTask->stat.times.memStrdup.exec++; + pTask->stat.bytes.memStrdup.exec+= nsize; - pTask->stat.times.strdup.succ++; - pTask->stat.bytes.strdup.succ+=nsize; + pTask->stat.times.memStrdup.succ++; + pTask->stat.bytes.memStrdup.succ+=nsize; mptWriteMem(pTask->pMemList[pTask->memIdx].p, size); @@ -851,10 +754,10 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtrl.pSrcString, size); if (NULL == pTask->pMemList[pTask->memIdx].p) { - pTask->stat.times.strndup.exec++; - pTask->stat.bytes.strndup.exec+=size + 1; - pTask->stat.times.strndup.fail++; - pTask->stat.bytes.strndup.fail+=size + 1; + pTask->stat.times.memStrndup.exec++; + pTask->stat.bytes.memStrndup.exec+=size + 1; + pTask->stat.times.memStrndup.fail++; + pTask->stat.bytes.memStrndup.fail+=size + 1; uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); return; } @@ -862,10 +765,10 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { assert(strlen((char*)pTask->pMemList[pTask->memIdx].p) == size); nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p); - pTask->stat.times.strndup.exec++; - pTask->stat.bytes.strndup.exec+=nsize; - pTask->stat.times.strndup.succ++; - pTask->stat.bytes.strndup.succ+=nsize; + pTask->stat.times.memStrndup.exec++; + pTask->stat.bytes.memStrndup.exec+=nsize; + pTask->stat.times.memStrndup.succ++; + pTask->stat.bytes.memStrndup.succ+=nsize; mptWriteMem(pTask->pMemList[pTask->memIdx].p, size); @@ -1152,8 +1055,8 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) { MPT_PRINTF("\t task max act times: %d\n", mptCtrl.taskActTimes ? mptCtrl.taskActTimes : MPT_MAX_MEM_ACT_TIMES); MPT_PRINTF("\t max single alloc size: %" PRId64 "\n", mptCtrl.maxSingleAllocSize); MPT_PRINTF("\t job quota size: %" PRId64 "\n", param->jobQuota); - MPT_PRINTF("\t auto pool size: %d\n", param->autoPoolSize); - MPT_PRINTF("\t pool max size: %" PRId64 "\n", param->poolSize); + MPT_PRINTF("\t reserve mode: %d\n", param->reserveMode); + MPT_PRINTF("\t upper limit size: %" PRId64 "\n", param->upperLimitSize); MPT_PRINTF("\t test thread num: %d\n", param->threadNum); MPT_PRINTF("\t random exec task: %d\n", param->randTask); } @@ -1181,6 +1084,7 @@ TEST(FuncTest, SysMemoryPerfTest) { for (int32_t i = 0; i < loopTimes; ++i) { code = taosGetSysAvailMemory(&freeSize); assert(0 == code); + taosMsleep(1); } totalUs = taosGetTimestampUs() - st; @@ -1188,11 +1092,11 @@ TEST(FuncTest, SysMemoryPerfTest) { } #endif -#if 0 +#if 1 TEST(FuncTest, SingleThreadTest) { char* caseName = "FuncTest:SingleThreadTest"; SMPTestParam param = {0}; - param.autoPoolSize = true; + param.reserveMode = true; param.threadNum = 1; mptPrintTestBeginInfo(caseName, ¶m);