enh: adjust mode and retire policy

This commit is contained in:
dapan1121 2024-11-06 19:26:14 +08:00
parent 4a4351d4ec
commit c5c85efeb5
20 changed files with 467 additions and 712 deletions

View File

@ -74,7 +74,9 @@ extern int32_t tsQueryMaxConcurrentTaskNum;
extern int32_t tsQueryConcurrentTaskNum;
extern int32_t tsSingleQueryMaxMemorySize;
extern int8_t tsQueryUseMemoryPool;
extern int32_t tsQueryBufferPoolSize;
//extern int32_t tsQueryBufferPoolSize;
extern int32_t tsMinReservedMemorySize;
extern int64_t tsCurrentAvailMemorySize;
extern int32_t tsNumOfQueryThreads;
extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions;

View File

@ -58,8 +58,8 @@ typedef struct SMPMemoryStat {
SMPStatItem memMalloc;
SMPStatItem memCalloc;
SMPStatItemExt memRealloc;
SMPStatItem strdup;
SMPStatItem strndup;
SMPStatItem memStrdup;
SMPStatItem memStrndup;
SMPStatItem memFree;
SMPStatItem memTrim;
@ -78,33 +78,33 @@ typedef struct SMPStatDetail {
typedef void (*mpDecConcSessionNum)(void);
typedef void (*mpIncConcSessionNum)(void);
typedef void (*mpSetConcSessionNum)(int32_t);
typedef void (*mpRetireJobs)(void*, int64_t, bool, int32_t);
typedef void (*mpRetireJob)(SMemPoolJob*, int32_t);
typedef void (*mpReserveFailFp)(int64_t, int32_t);
typedef void (*mpReserveReachFp)(uint64_t, int32_t);
typedef void (*mpCfgUpdate)(void*, void*);
typedef struct SMemPoolCallBack {
mpDecConcSessionNum decSessFp;
mpIncConcSessionNum incSessFp;
mpSetConcSessionNum setSessFp;
mpRetireJobs retireJobsFp;
mpRetireJob retireJobFp;
mpCfgUpdate cfgUpdateFp;
//mpDecConcSessionNum decSessFp;
//mpIncConcSessionNum incSessFp;
//mpSetConcSessionNum setSessFp;
mpReserveFailFp failFp;
mpReserveReachFp reachFp;
//mpCfgUpdate cfgUpdateFp;
} SMemPoolCallBack;
typedef struct SMemPoolCfg {
bool autoMaxSize;
int64_t reserveSize;
int64_t retireUnitSize;
int64_t freeSize;
int64_t jobQuota;
//bool reserveMode;
int32_t *reserveSize; //MB
//int32_t *upperLimitSize; //MB
//int64_t retireUnitSize;
int32_t *jobQuota; //MB
int32_t chunkSize;
int32_t threadNum;
MemPoolEvictPolicy evicPolicy;
SMemPoolCallBack cb;
} SMemPoolCfg;
#define MEMPOOL_GET_ALLOC_SIZE(_dstat) ((_dstat)->bytes.memMalloc.succ + (_dstat)->bytes.memCalloc.succ + (_dstat)->bytes.memRealloc.succ + (_dstat)->bytes.strdup.succ + (_dstat)->bytes.strndup.succ)
#define MEMPOOL_GET_ALLOC_SIZE(_dstat) ((_dstat)->bytes.memMalloc.succ + (_dstat)->bytes.memCalloc.succ + (_dstat)->bytes.memRealloc.succ + (_dstat)->bytes.memStrdup.succ + (_dstat)->bytes.memStrndup.succ)
#define MEMPOOL_GET_FREE_SIZE(_dstat) ((_dstat)->bytes.memRealloc.origSucc + (_dstat)->bytes.memFree.succ)
#define MEMPOOL_GET_USED_SIZE(_dstat) (MEMPOOL_GET_ALLOC_SIZE(_dstat) - MEMPOOL_GET_FREE_SIZE(_dstat))
@ -129,8 +129,8 @@ void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg);
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName);
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd);
void taosMemPoolGetUsedSizeEnd(void* poolHandle);
bool taosMemPoolNeedRetireJob(void* poolHandle);
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize);
int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp);
#define taosMemPoolFreeClear(ptr) \
@ -143,29 +143,27 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t
#ifndef BUILD_TEST
extern threadlocal void* threadPoolHandle;
extern void* gMemPoolHandle;
extern threadlocal void* threadPoolSession;
#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolHandle = _pool; threadPoolSession = _session; tsEnableRandErr = true;} while (0)
#define taosDisableMemoryPoolUsage() (threadPoolHandle = NULL, tsEnableRandErr = false)
#define taosSaveDisableMemoryPoolUsage(_handle) do { (_handle) = threadPoolHandle; threadPoolHandle = NULL; } while (0)
#define taosRestoreEnableMemoryPoolUsage(_handle) (threadPoolHandle = (_handle))
#define taosEnableMemoryPoolUsage(_pool, _session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0)
#define taosDisableMemoryPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0)
#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define taosStrndup(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolStrndup(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size)))
#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define taosMemoryTrim(_size, _trimed) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed)))
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
#define taosMemoryMalloc(_size) ((NULL != gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryCalloc(_num, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define taosMemoryRealloc(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define taosStrdup(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define taosStrndup(_ptr, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size)))
#define taosMemoryFree(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define taosMemorySize(_ptr) ((NULL != gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define taosMemoryTrim(_size, _trimed) ((NULL != gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed)))
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
#else
#define taosEnableMemoryPoolUsage(_pool, _session)
#define taosDisableMemoryPoolUsage()
#define taosSaveDisableMemoryPoolUsage(_handle)
#define taosRestoreEnableMemoryPoolUsage(_handle)
#define taosSaveDisableMemoryPoolUsage()
#define taosRestoreEnableMemoryPoolUsage()
#define taosMemoryMalloc(_size) taosMemMalloc(_size)
#define taosMemoryCalloc(_num, _size) taosMemCalloc(_num, _size)

View File

@ -616,6 +616,8 @@ typedef enum {
ANAL_ALGO_TYPE_END,
} EAnalAlgoType;
#define MIN_RESERVE_MEM_SIZE 1024 // MB
#ifdef __cplusplus
}
#endif

View File

@ -57,6 +57,8 @@ int32_t tsShellActivityTimer = 3; // second
int8_t tsQueryUseMemoryPool = 1;
int32_t tsQueryBufferPoolSize = 0; //MB
int32_t tsSingleQueryMaxMemorySize = 0; //MB
int32_t tsMinReservedMemorySize = MIN_RESERVE_MEM_SIZE; //MB
int64_t tsCurrentAvailMemorySize = 0;
// queue & threads
int32_t tsQueryMinConcurrentTaskNum = 1;
@ -744,8 +746,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
//TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minReservedMemorySize", tsMinReservedMemorySize, 1024, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE));
@ -1699,8 +1702,11 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "singleQueryMaxMemorySize");
tsSingleQueryMaxMemorySize = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryBufferPoolSize");
tsQueryBufferPoolSize = pItem->i32;
//TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryBufferPoolSize");
//tsQueryBufferPoolSize = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "minReservedMemorySize");
tsMinReservedMemorySize = pItem->i32;
// GRANT_CFG_GET;
TAOS_RETURN(TSDB_CODE_SUCCESS);
@ -2070,7 +2076,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum},
{"queryBufferPoolSize", &tsQueryBufferPoolSize},
{"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize}};
if ((code = taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true)) != TSDB_CODE_SUCCESS) {

View File

@ -442,6 +442,13 @@ int mainWindows(int argc, char **argv) {
taosCleanupArgs();
return code;
}
if ((code = taosMemoryPoolInit(qWorkerRetireJobs, qWorkerRetireJob)) != 0) {
dError("failed to init conv");
taosCloseLog();
taosCleanupArgs();
return code;
}
if ((code = taosConvInit()) != 0) {
dError("failed to init conv");

View File

@ -432,7 +432,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
(void)atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
taosArrayDestroy(pInserter->pDataBlocks);
taosMemoryFree(pInserter->pSchema);
taosMemFree(pInserter->pSchema);
taosMemoryFree(pInserter->pParam);
taosHashCleanup(pInserter->pCols);
(void)taosThreadMutexDestroy(&pInserter->mutex);

View File

@ -116,7 +116,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pDataInfo->pRsp);
taosMemFreeClear(pDataInfo->pRsp);
goto _error;
}
} else {
@ -125,7 +125,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
", totalRows:%" PRIu64 ", try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows,
pExchangeInfo->loadInfo.totalRows, i + 1, totalSources);
taosMemoryFreeClear(pDataInfo->pRsp);
taosMemFreeClear(pDataInfo->pRsp);
}
break;
}
@ -154,13 +154,13 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
}
taosMemoryFreeClear(pDataInfo->pRsp);
taosMemFreeClear(pDataInfo->pRsp);
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pDataInfo->pRsp);
taosMemFreeClear(pDataInfo->pRsp);
goto _error;
}
}
@ -633,7 +633,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
pDataInfo->startTime = taosGetTimestampUs();
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
SFetchRspHandleWrapper* pWrapper = taosMemCalloc(1, sizeof(SFetchRspHandleWrapper));
SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper));
QUERY_CHECK_NULL(pWrapper, code, lino, _end, terrno);
pWrapper->exchangeId = pExchangeInfo->self;
pWrapper->sourceIndex = sourceIndex;
@ -673,7 +673,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
return pTaskInfo->code;
}
void* msg = taosMemCalloc(1, msgSize);
void* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
taosMemFree(pWrapper);
@ -696,7 +696,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
pSource->execId, pExchangeInfo, sourceIndex, totalSources);
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo));
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
taosMemFreeClear(msg);
taosMemFree(pWrapper);
@ -714,9 +714,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
int64_t transporterId = 0;
void* poolHandle = NULL;
taosSaveDisableMemoryPoolUsage(poolHandle);
code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo);
taosRestoreEnableMemoryPoolUsage(poolHandle);
QUERY_CHECK_CODE(code, lino, _end);
int64_t* pRpcHandle = taosArrayGet(pExchangeInfo->pFetchRpcHandles, sourceIndex);
*pRpcHandle = transporterId;

View File

@ -1704,6 +1704,7 @@ int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode,
nodesDestroyList(groupNew);
pAPI->metaReaderFn.clearReader(&mr);
return TSDB_CODE_SUCCESS;
}
@ -3090,3 +3091,4 @@ _end:
}
return code;
}

View File

@ -441,6 +441,7 @@ static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
}
_end:
pAPI->metaReaderFn.clearReader(&mr);
(*ppArrayRes) = qa;

View File

@ -102,7 +102,6 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
int32_t code = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api, pTaskInfo);
if (*pTaskInfo == NULL || code != 0) {
nodesDestroyNode((SNode*)pPlan);
taosMemoryFree(sql);
return code;
}
@ -112,7 +111,14 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
}
}
TSWAP((*pTaskInfo)->sql, sql);
(*pTaskInfo)->sql = taosStrdup(sql);
if (NULL == (*pTaskInfo)->sql) {
code = terrno;
nodesDestroyNode((SNode*)pPlan);
doDestroyTask(*pTaskInfo);
(*pTaskInfo) = NULL;
return code;
}
(*pTaskInfo)->pSubplan = pPlan;
(*pTaskInfo)->pWorkerCb = pHandle->pWorkerCb;

View File

@ -1413,7 +1413,7 @@ static int32_t doSetUserTableMetaInfo(SStoreMetaReader* pMetaReaderFn, SStoreMet
SMetaReader mr1 = {0};
pMetaReaderFn->initReader(&mr1, pVnode, META_READER_NOLOCK, pMetaFn);
int64_t suid = pMReader->me.ctbEntry.suid;
code = pMetaReaderFn->getTableEntryByUid(&mr1, suid);
if (code != TSDB_CODE_SUCCESS) {
@ -1576,6 +1576,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
code = doSetUserTableMetaInfo(&pAPI->metaReaderFn, &pAPI->metaFn, pInfo->readHandle.vnode, &mr, *uid, dbname, vgId,
p, numOfRows, GET_TASKID(pTaskInfo));
@ -1723,7 +1724,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pInfo->readHandle.vnode, META_READER_NOLOCK, &pAPI->metaFn);
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
code = pAPI->metaReaderFn.getTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
@ -2147,7 +2148,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
tstrncpy(pInfo->req.user, pInfo->pUser, tListLen(pInfo->req.user));
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = taosMemCalloc(1, contLen);
char* buf1 = taosMemoryCalloc(1, contLen);
if (!buf1) {
return NULL;
}
@ -2159,7 +2160,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
}
// send the fetch remote task result reques
SMsgSendInfo* pMsgSendInfo = taosMemCalloc(1, sizeof(SMsgSendInfo));
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
@ -2177,10 +2178,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
pMsgSendInfo->fp = loadSysTableCallback;
pMsgSendInfo->requestId = pTaskInfo->id.queryId;
void* poolHandle = NULL;
taosSaveDisableMemoryPoolUsage(poolHandle);
code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, NULL, pMsgSendInfo);
taosRestoreEnableMemoryPoolUsage(poolHandle);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;

View File

@ -41,9 +41,6 @@ extern "C" {
#define QW_THREAD_MAX_SCHED_TASK_NUM 10
#define QW_QUERY_MEM_POOL_NAME "Query"
#define QW_DEFAULT_RESERVE_MEM_PERCENT 20
#define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576UL)
#define QW_MIN_MEM_POOL_SIZE (1048576UL)
#define QW_MAX_RETIRE_JOB_NUM 10000
#define QW_DEFAULT_THREAD_TASK_NUM 3
@ -252,9 +249,6 @@ typedef struct SQueryMgmt {
SRWLatch taskMgmtLock;
int32_t concTaskLevel;
SHashObj* pJobInfo;
void* memPoolHandle;
int8_t memPoolInited;
SQWRetireCtx retireCtx;
} SQueryMgmt;
#define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST)

View File

@ -1,45 +1,6 @@
#include "qwInt.h"
#include "qworker.h"
int32_t qwGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
int64_t freeSize = 0;
int64_t usedSize = 0;
bool needEnd = false;
taosMemPoolGetUsedSizeBegin(pHandle, &usedSize, &needEnd);
int32_t code = taosGetSysAvailMemory(&freeSize);
if (needEnd) {
taosMemPoolGetUsedSizeEnd(pHandle);
}
if (TSDB_CODE_SUCCESS != code) {
qError("get system avaiable memory size failed, error: 0x%x", code);
return code;
}
int64_t totalSize = freeSize + usedSize;
int64_t reserveSize = TMAX(tsTotalMemoryKB * 1024 * QW_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, QW_MIN_RESERVE_MEM_SIZE);
int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL;
if (availSize < QW_MIN_MEM_POOL_SIZE) {
qError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize);
return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM;
}
uDebug("new pool maxSize:%" PRId64 ", usedSize:%" PRId64 ", freeSize:%" PRId64, availSize, usedSize, freeSize);
*maxSize = availSize;
return TSDB_CODE_SUCCESS;
}
int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chunkSize) {
//TODO
*chunkSize = 2 * 1048576;
return TSDB_CODE_SUCCESS;
}
void qwSetConcurrentTaskNumCb(int32_t taskNum) {
int32_t finTaskNum = TMIN(taskNum, tsNumOfQueryThreads * QW_DEFAULT_THREAD_TASK_NUM);
@ -90,9 +51,34 @@ int32_t qwInitJobInfo(uint64_t qId, SQWJobInfo* pJob) {
return code;
}
int32_t qwInitJobHash(void) {
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pHash = NULL;
if (NULL == atomic_load_ptr(&gQueryMgmt.pJobInfo)) {
pHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pHash) {
qError("init job hash failed, error:0x%x", terrno);
return terrno;
}
if (NULL != atomic_val_compare_exchange_ptr(&gQueryMgmt.pJobInfo, NULL, pHash)) {
taosHashCleanup(pHash);
}
}
return code;
}
int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
int32_t code = TSDB_CODE_SUCCESS;
SQWJobInfo* pJob = NULL;
if (NULL == gQueryMgmt.pJobInfo) {
QW_ERR_RET(qwInitJobHash());
}
while (true) {
pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId));
@ -146,202 +132,4 @@ _return:
return code;
}
void qwRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, &mpJob->jobId, sizeof(mpJob->jobId));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId);
return;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
qInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
} else {
qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
}
}
void qwLowLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
while (pJob) {
if (!taosMemPoolNeedRetireJob(pHandle)) {
taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob);
return;
}
uint64_t jobId = pJob->memInfo->jobId;
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
qwRetireJob(pJob);
qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
jobId, aSize, retireSize);
taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob);
break;
}
pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
}
}
void qwMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
PriorityQueueNode qNode;
while (NULL != pJob) {
if (0 == atomic_load_8(&pJob->retired)) {
qNode.data = pJob;
(void)taosBQPush(gQueryMgmt.retireCtx.pJobQueue, &qNode);
}
pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
}
PriorityQueueNode* pNode = NULL;
uint64_t jobId = 0;
int64_t retiredSize = 0;
while (retiredSize < retireSize) {
if (!taosMemPoolNeedRetireJob(pHandle)) {
break;
}
pNode = taosBQTop(gQueryMgmt.retireCtx.pJobQueue);
if (NULL == pNode) {
break;
}
pJob = (SQWJobInfo*)pNode->data;
if (atomic_load_8(&pJob->retired)) {
taosBQPop(gQueryMgmt.retireCtx.pJobQueue);
continue;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
jobId = pJob->memInfo->jobId;
qwRetireJob(pJob);
qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
jobId, aSize, retireSize);
retiredSize += aSize;
}
taosBQPop(gQueryMgmt.retireCtx.pJobQueue);
}
taosBQClear(gQueryMgmt.retireCtx.pJobQueue);
}
void qwRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int32_t errCode) {
(lowLevelRetire) ? qwLowLevelRetire(pHandle, retireSize, errCode) : qwMidLevelRetire(pHandle, retireSize, errCode);
}
int32_t qwUpdateQueryMemPoolCfg(void* pHandle, int64_t* pFreeSize, bool* autoMaxSize, int64_t* pReserveSize, int64_t* pRetireUnitSize) {
if (tsQueryBufferPoolSize > 0) {
*pFreeSize = tsQueryBufferPoolSize * 1048576UL;
*autoMaxSize = false;
return TSDB_CODE_SUCCESS;
}
int32_t code = qwGetMemPoolMaxMemSize(pHandle, pFreeSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
*autoMaxSize = true;
return code;
}
void qwCheckUpateCfgCb(void* pHandle, void* cfg) {
SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg;
int64_t newJobQuota = tsSingleQueryMaxMemorySize * 1048576UL;
if (pCfg->jobQuota != newJobQuota) {
atomic_store_64(&pCfg->jobQuota, newJobQuota);
}
int64_t freeSize = 0, reserveSize = 0, retireUnitSize = 0;
bool autoMaxSize = false;
int32_t code = qwUpdateQueryMemPoolCfg(pHandle, &freeSize, &autoMaxSize, &reserveSize, &retireUnitSize);
if (TSDB_CODE_SUCCESS != code) {
pCfg->freeSize = 0;
qError("get query memPool freeSize failed, reset freeSize to %" PRId64, pCfg->freeSize);
}
if (pCfg->autoMaxSize != autoMaxSize || pCfg->freeSize != freeSize) {
pCfg->autoMaxSize = autoMaxSize;
atomic_store_64(&pCfg->freeSize, freeSize);
taosMemPoolCfgUpdate(pHandle, pCfg);
}
}
static bool qwJobMemSizeCompFn(void* l, void* r, void* param) {
SQWJobInfo* left = (SQWJobInfo*)l;
SQWJobInfo* right = (SQWJobInfo*)r;
if (atomic_load_8(&right->retired)) {
return true;
}
return atomic_load_64(&right->memInfo->allocMemSize) < atomic_load_64(&left->memInfo->allocMemSize);
}
void qwDeleteJobQueueData(void* pData) {}
int32_t qwInitQueryPool(void) {
int32_t code = TSDB_CODE_SUCCESS;
#ifdef LINUX
if (!tsQueryUseMemoryPool) {
#endif
qInfo("query memory pool disabled");
return code;
#ifdef LINUX
}
#endif
taosGetTotalMemory(&tsTotalMemoryKB);
SMemPoolCfg cfg = {0};
code = qwUpdateQueryMemPoolCfg(NULL, &cfg.freeSize, &cfg.autoMaxSize, &cfg.reserveSize, &cfg.retireUnitSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.chunkSize = 1048576;
cfg.jobQuota = tsSingleQueryMaxMemorySize * 1048576UL;
cfg.cb.setSessFp = qwSetConcurrentTaskNumCb;
cfg.cb.decSessFp = qwDecConcurrentTaskNumCb;
cfg.cb.incSessFp = qwIncConcurrentTaskNumCb;
cfg.cb.retireJobsFp = qwRetireJobsCb;
cfg.cb.retireJobFp = qwRetireJobCb;
cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb;
gQueryMgmt.pJobInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == gQueryMgmt.pJobInfo) {
qError("init job hash failed, error:0x%x", terrno);
return terrno;
}
gQueryMgmt.retireCtx.pJobQueue = createBoundedQueue(QW_MAX_RETIRE_JOB_NUM, qwJobMemSizeCompFn, qwDeleteJobQueueData, NULL);
if (NULL == gQueryMgmt.retireCtx.pJobQueue) {
qError("init job bounded queue failed, error:0x%x", terrno);
return terrno;
}
code = taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryMgmt.memPoolHandle);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
qInfo("query memory pool initialized");
return code;
}

View File

@ -321,7 +321,9 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
taosArrayDestroy(ctx->tbInfo);
taosMemPoolDestroySession(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
if (gMemPoolHandle && ctx->memPoolSession) {
taosMemPoolDestroySession(gMemPoolHandle, ctx->memPoolSession);
}
}
static void freeExplainExecItem(void *param) {

View File

@ -767,7 +767,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->sId = sId;
ctx->phase = -1;
if (NULL != gQueryMgmt.memPoolHandle) {
if (NULL != gMemPoolHandle) {
QW_ERR_JRET(qwInitSession(QW_FPARAMS(), ctx, &ctx->memPoolSession));
}
@ -803,7 +803,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
ctx->queryMsgType = qwMsg->msgType;
ctx->localExec = false;
taosEnableMemoryPoolUsage(gQueryMgmt.memPoolHandle, ctx->memPoolSession);
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
taosDisableMemoryPoolUsage();
if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
@ -814,7 +817,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH);
taosDisableMemoryPoolUsage();
sql = NULL;
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
qDestroyTask(pTaskInfo);
@ -1365,10 +1367,6 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (0 == atomic_val_compare_exchange_8(&gQueryMgmt.memPoolInited, 0, 1)) {
QW_ERR_RET(qwInitQueryPool());
}
int32_t qwNum = atomic_add_fetch_32(&gQwMgmt.qwNum, 1);
if (1 == qwNum) {
@ -1632,3 +1630,48 @@ _return:
QW_RET(code);
}
void qWorkerRetireJob(uint64_t jobId, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, &jobId, sizeof(jobId));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " fail to get job from job hash", jobId);
return;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
qInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
qwRetireJob(pJob);
} else {
qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
}
}
void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
uint64_t jobId = 0;
int64_t retiredSize = 0;
while (retiredSize < retireSize && NULL != pJob) {
if (atomic_load_8(&pJob->retired)) {
pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
continue;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
jobId = pJob->memInfo->jobId;
qwRetireJob(pJob);
retiredSize += aSize;
qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
jobId, aSize, retireSize);
}
pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
}
}

View File

@ -31,7 +31,9 @@ extern "C" {
#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000
#define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF
#define MP_DEFAULT_MEM_CHK_INTERVAL_MS 100
#define MP_DEFAULT_MEM_CHK_INTERVAL_MS 10
#define MP_MIN_MEM_CHK_INTERVAL_MS 1
#define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95)
#define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9)
@ -40,6 +42,10 @@ extern "C" {
#define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL)
#define MP_CFG_UPDATE_MIN_RESERVE_SIZE (50 * 1024 * 1048576UL)
#define MP_DEFAULT_RESERVE_MEM_PERCENT 20
#define MP_MIN_FREE_SIZE_AFTER_RESERVE (4 * 1024 * 1048576UL)
#define MP_MIN_MEM_POOL_SIZE (5 * 1024 * 1048576UL)
// FLAGS AREA
#define MP_CHUNK_FLAG_IN_USE (1 << 0)
@ -273,7 +279,7 @@ typedef struct SMemPool {
int16_t slotId;
SRWLatch cfgLock;
SMemPoolCfg cfg;
int64_t retireThreshold[3];
//int64_t retireThreshold[3];
int64_t retireUnit;
SMPCtrlInfo ctrl;
@ -345,9 +351,6 @@ enum {
#define MP_STAT_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail
#define MP_STAT_ORIG_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail, (_item).origExec, (_item).origSucc, (_item).origFail
#define MP_API_ENTER() void* _pPoolHandle = NULL; taosSaveDisableMemoryPoolUsage(_pPoolHandle)
#define MP_API_LEAVE() taosRestoreEnableMemoryPoolUsage(_pPoolHandle)
#define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \
do { \
(_header)->size = _size; \

View File

@ -301,6 +301,7 @@ int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession) {
}
int32_t mpChunkUpdateCfg(SMemPool* pPool) {
/*
pPool->chunk.maxChunkNum = pPool->cfg.freeSize / pPool->cfg.chunkSize;
if (pPool->chunk.maxChunkNum <= 0) {
uError("invalid memory pool max chunk num, freeSize:%" PRId64 ", chunkSize:%d", pPool->cfg.freeSize, pPool->cfg.chunkSize);
@ -310,7 +311,7 @@ int32_t mpChunkUpdateCfg(SMemPool* pPool) {
pPool->chunk.readyChunkReserveNum = TMIN(pPool->cfg.threadNum * pPool->chunk.threadChunkReserveNum, pPool->chunk.maxChunkNum);
pPool->chunk.chunkCache.groupNum = TMAX(pPool->chunk.maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE);
*/
return TSDB_CODE_SUCCESS;
}

View File

@ -33,7 +33,10 @@ int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint
nSize = taosMemSize(res);
mpUpdateAllocSize(pPool, pSession, nSize, nSize - *size);
} else {
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, *size);
if (NULL != pSession) {
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, *size);
}
(void)atomic_sub_fetch_64(&pPool->allocMemSize, *size);
uError("malloc %" PRId64 " alignment %d failed, code: 0x%x", *size, alignment, terrno);
@ -64,9 +67,12 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori
taosRLockLatch(&pPool->cfgLock); // tmp test
taosMemFree(ptr);
if (NULL != pSession) {
(void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize);
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize);
}
(void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize);
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize);
taosRUnLockLatch(&pPool->cfgLock);

View File

@ -19,9 +19,10 @@
#include "tlog.h"
#include "tutil.h"
#include "taos.h"
#include "tglobal.h"
static TdThreadOnce gMPoolInit = PTHREAD_ONCE_INIT;
threadlocal void* threadPoolHandle = NULL;
void* gMemPoolHandle = NULL;
threadlocal void* threadPoolSession = NULL;
SMemPoolMgmt gMPMgmt = {0};
SMPStrategyFp gMPFps[] = {
@ -64,7 +65,7 @@ void mpFreeCacheGroup(SMPCacheGroup* pGrp) {
int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead) {
SMPCacheGroup* pGrp = NULL;
if (NULL == pInfo->pGrpHead) {
pInfo->pGrpHead = taosMemCalloc(1, sizeof(*pInfo->pGrpHead));
pInfo->pGrpHead = taosMemoryCalloc(1, sizeof(*pInfo->pGrpHead));
if (NULL == pInfo->pGrpHead) {
uError("malloc chunkCache failed");
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
@ -72,12 +73,12 @@ int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup
pGrp = pInfo->pGrpHead;
} else {
pGrp = (SMPCacheGroup*)taosMemCalloc(1, sizeof(SMPCacheGroup));
pGrp = (SMPCacheGroup*)taosMemoryCalloc(1, sizeof(SMPCacheGroup));
pGrp->pNext = pHead;
}
pGrp->nodesNum = pInfo->groupNum;
pGrp->pNodes = taosMemCalloc(pGrp->nodesNum, pInfo->nodeSize);
pGrp->pNodes = taosMemoryCalloc(pGrp->nodesNum, pInfo->nodeSize);
if (NULL == pGrp->pNodes) {
uError("calloc %d %d nodes in cache group failed", pGrp->nodesNum, pInfo->nodeSize);
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
@ -162,19 +163,12 @@ void mpPushIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPListNode* pNod
int32_t mpUpdateCfg(SMemPool* pPool) {
atomic_store_64(&pPool->retireThreshold[0], pPool->cfg.freeSize * MP_RETIRE_LOW_THRESHOLD_PERCENT);
atomic_store_64(&pPool->retireThreshold[1], pPool->cfg.freeSize * MP_RETIRE_MID_THRESHOLD_PERCENT);
atomic_store_64(&pPool->retireThreshold[2], pPool->cfg.freeSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT);
if (gMPFps[gMPMgmt.strategy].updateCfgFp) {
MP_ERR_RET((*gMPFps[gMPMgmt.strategy].updateCfgFp)(pPool));
}
uDebug("memPool %s cfg updated, autoMaxSize:%d, freeSize:%" PRId64
", jobQuota:%" PRId64 ", threadNum:%d, retireThreshold:%" PRId64 "-%" PRId64 "-%" PRId64
", retireUnit:%" PRId64, pPool->name, pPool->cfg.autoMaxSize, pPool->cfg.freeSize,
pPool->cfg.jobQuota, pPool->cfg.threadNum, pPool->retireThreshold[0], pPool->retireThreshold[1],
pPool->retireThreshold[2], pPool->cfg.retireUnitSize);
uDebug("memPool %s cfg updated, reserveSize:%dMB, jobQuota:%dMB, threadNum:%d",
pPool->name, pPool->cfg.reserveSize, pPool->cfg.jobQuota, pPool->cfg.threadNum);
return TSDB_CODE_SUCCESS;
}
@ -261,17 +255,21 @@ FORCE_INLINE void mpUpdateMaxAllocSize(int64_t* pMaxAllocMemSize, int64_t newSiz
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize) {
if (addSize) {
atomic_add_fetch_64(&pSession->pJob->job.allocMemSize, addSize);
if (NULL != pSession) {
atomic_add_fetch_64(&pSession->pJob->job.allocMemSize, addSize);
}
atomic_add_fetch_64(&pPool->allocMemSize, addSize);
}
if (NULL != pSession) {
int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
mpUpdateMaxAllocSize(&pSession->maxAllocMemSize, allocMemSize);
allocMemSize = atomic_load_64(&pSession->pJob->job.allocMemSize);
mpUpdateMaxAllocSize(&pSession->pJob->job.maxAllocMemSize, allocMemSize);
}
int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
mpUpdateMaxAllocSize(&pSession->maxAllocMemSize, allocMemSize);
allocMemSize = atomic_load_64(&pSession->pJob->job.allocMemSize);
mpUpdateMaxAllocSize(&pSession->pJob->job.maxAllocMemSize, allocMemSize);
allocMemSize = atomic_load_64(&pPool->allocMemSize);
int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize);
mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize);
}
@ -295,62 +293,45 @@ int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) {
SMPJob* pJob = pSession->pJob;
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pSession) {
(void)atomic_add_fetch_64(&pPool->allocMemSize, size);
return code;
}
SMPJob* pJob = pSession->pJob;
int64_t cAllocSize = atomic_add_fetch_64(&pJob->job.allocMemSize, size);
int64_t quota = atomic_load_64(&pPool->cfg.jobQuota);
if (quota > 0 && cAllocSize > quota) {
int64_t quota = atomic_load_32(pPool->cfg.jobQuota);
if (quota > 0 && cAllocSize / 1048576UL > quota) {
code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD;
uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota);
pPool->cfg.cb.retireJobFp(&pJob->job, code);
pPool->cfg.cb.reachFp(pJob->job.jobId, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code);
}
int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
quota = atomic_load_64(&pPool->retireThreshold[2]);
if (pAllocSize >= quota) {
if (atomic_load_64(&tsCurrentAvailMemorySize) <= ((atomic_load_32(pPool->cfg.reserveSize) * 1048576UL) + size)) {
code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
uWarn("%s pool allocSize %" PRId64 " reaches the high quota %" PRId64, pPool->name, pAllocSize, quota);
uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %dMB",
pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize);
pPool->cfg.cb.reachFp(pJob->job.jobId, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code);
}
(void)atomic_add_fetch_64(&pPool->allocMemSize, size);
/*
int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
if (pAllocSize >= atomic_load_32(pPool->cfg.upperLimitSize) * 1048576UL) {
code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
uWarn("%s pool allocSize %" PRId64 " reaches the upperLimit %" PRId64, pPool->name, pAllocSize, atomic_load_32(pPool->cfg.upperLimitSize) * 1048576UL);
pPool->cfg.cb.retireJobFp(&pJob->job, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_RET(code);
}
quota = atomic_load_64(&pPool->retireThreshold[1]);
if (pAllocSize >= quota) {
uInfo("%s pool allocSize %" PRId64 " reaches the middle quota %" PRId64, pPool->name, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->cfg.retireUnitSize) / 2) {
code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
pPool->cfg.cb.retireJobFp(&pJob->job, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_RET(code);
} else {
MP_ERR_RET(mpPutRetireMsgToQueue(pPool, false));
}
return TSDB_CODE_SUCCESS;
}
quota = atomic_load_64(&pPool->retireThreshold[0]);
if (pAllocSize >= quota) {
uInfo("%s pool allocSize %" PRId64 " reaches the low quota %" PRId64, pPool->name, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->cfg.retireUnitSize)) {
code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
pPool->cfg.cb.retireJobFp(&pJob->job, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_RET(code);
} else {
MP_ERR_RET(mpPutRetireMsgToQueue(pPool, true));
}
}
*/
return TSDB_CODE_SUCCESS;
}
@ -438,8 +419,8 @@ void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailN
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Malloc", pDetail->times.memMalloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->times.memCalloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Realloc", pDetail->times.memRealloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->times.strdup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->times.strndup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->times.memStrdup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->times.memStrndup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->times.memFree));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->times.memTrim));
break;
@ -459,8 +440,8 @@ void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailN
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Malloc", pDetail->bytes.memMalloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->bytes.memCalloc));
uInfo(MP_STAT_ORIG_FORMAT, MP_STAT_ORIG_VALUE("Realloc", pDetail->bytes.memRealloc));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->bytes.strdup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->bytes.strndup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->bytes.memStrdup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->bytes.memStrndup));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->bytes.memFree));
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->bytes.memTrim));
break;
@ -656,31 +637,31 @@ void mpLogDetailStat(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput*
}
case E_MP_STAT_LOG_MEM_STRDUP: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.strdup.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.exec, pInput->size);
atomic_add_fetch_64(&pDetail->times.memStrdup.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memStrdup.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.strdup.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.succ, pInput->size);
atomic_add_fetch_64(&pDetail->times.memStrdup.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memStrdup.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.strdup.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.strdup.fail, pInput->size);
atomic_add_fetch_64(&pDetail->times.memStrdup.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memStrdup.fail, pInput->size);
}
break;
}
case E_MP_STAT_LOG_MEM_STRNDUP: {
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) {
atomic_add_fetch_64(&pDetail->times.strndup.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.strndup.exec, pInput->size);
atomic_add_fetch_64(&pDetail->times.memStrndup.exec, 1);
atomic_add_fetch_64(&pDetail->bytes.memStrndup.exec, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) {
atomic_add_fetch_64(&pDetail->times.strndup.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.strndup.succ, pInput->size);
atomic_add_fetch_64(&pDetail->times.memStrndup.succ, 1);
atomic_add_fetch_64(&pDetail->bytes.memStrndup.succ, pInput->size);
}
if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) {
atomic_add_fetch_64(&pDetail->times.strndup.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.strndup.fail, pInput->size);
atomic_add_fetch_64(&pDetail->times.memStrndup.fail, 1);
atomic_add_fetch_64(&pDetail->bytes.memStrndup.fail, pInput->size);
}
break;
}
@ -890,14 +871,16 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt
case E_MP_STAT_LOG_MEM_STRDUP:
case E_MP_STAT_LOG_MEM_STRNDUP:
case E_MP_STAT_LOG_MEM_TRIM: {
if (MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
mpLogDetailStat(&pSession->stat.statDetail, item, pInput);
}
if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
mpLogDetailStat(&pPool->stat.statDetail, item, pInput);
}
if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
mpLogPosStat(&pSession->stat.posStat, item, pInput, true);
}
if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
mpLogPosStat(&pPool->stat.posStat, item, pInput, false);
}
break;
@ -930,7 +913,8 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
if (allocSize != freeSize) {
uError("%s Session in JOB:0x%" PRIx64 " stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64,
detailName, pSession->pJob->job.jobId, allocSize, freeSize);
ASSERT(0);
taosMemPoolPrintStat(NULL, pSession, detailName);
} else {
uDebug("%s Session in JOB:0x%" PRIx64 " stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64,
detailName, pSession->pJob->job.jobId, allocSize, freeSize);
@ -943,13 +927,13 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
pDetail = &pPool->stat.statDetail;
int64_t sessInit = atomic_load_64(&pPool->stat.statSession.initFail) + atomic_load_64(&pPool->stat.statSession.initSucc);
if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == atomic_load_64(&pPool->stat.statSession.destroyNum)) {
int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.strdup.succ + pDetail->bytes.strndup.succ;
int64_t allocSize = pDetail->bytes.memMalloc.succ + pDetail->bytes.memCalloc.succ + pDetail->bytes.memRealloc.succ + pDetail->bytes.memStrdup.succ + pDetail->bytes.memStrndup.succ;
int64_t freeSize = pDetail->bytes.memRealloc.origSucc + pDetail->bytes.memFree.succ;
if (allocSize != freeSize) {
uError("%s MemPool %s stat check failed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
ASSERT(0);
taosMemPoolPrintStat(poolHandle, NULL, detailName);
} else {
uDebug("%s MemPool %s stat check succeed, allocSize:%" PRId64 ", freeSize:%" PRId64, detailName, pPool->name, allocSize, freeSize);
}
@ -959,6 +943,7 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
void mpCheckUpateCfg(void) {
/*
taosRLockLatch(&gMPMgmt.poolLock);
int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList);
for (int32_t i = 0; i < poolNum; ++i) {
@ -968,14 +953,44 @@ void mpCheckUpateCfg(void) {
}
}
taosRUnLockLatch(&gMPMgmt.poolLock);
*/
}
void mpUpdateSystemAvailableMemorySize() {
static int64_t errorTimes = 0;
int64_t sysAvailSize = 0;
int32_t code = taosGetSysAvailMemory(&sysAvailSize);
if (TSDB_CODE_SUCCESS != code) {
errorTimes++;
if (0 == errorTimes % 1000) {
uError("get system available memory size failed, error: %s, errorTimes:%" PRId64, tstrerror(code), errorTimes);
}
return;
}
atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize);
}
void* mpMgmtThreadFunc(void* param) {
int32_t timeout = 0;
int64_t retireSize = 0;
SMemPool* pPool = (SMemPool*)gMemPoolHandle;
while (0 == atomic_load_8(&gMPMgmt.modExit)) {
mpUpdateSystemAvailableMemorySize();
retireSize = atomic_load_32(pPool->cfg.reserveSize) * 1048576UL - tsCurrentAvailMemorySize;
if (retireSize > 0) {
(*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
}
taosMsleep(MP_DEFAULT_MEM_CHK_INTERVAL_MS);
/*
timeout = tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs);
if (0 != timeout) {
mpCheckUpateCfg();
mpUpdateSystemAvailableMemorySize();
continue;
}
@ -984,13 +999,31 @@ void* mpMgmtThreadFunc(void* param) {
} else if (atomic_load_8(&gMPMgmt.msgQueue.lowLevelRetire)) {
(*gMPMgmt.msgQueue.pPool->cfg.cb.retireJobsFp)(gMPMgmt.msgQueue.pPool, atomic_load_64(&gMPMgmt.msgQueue.pPool->cfg.retireUnitSize), true, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
}
mpCheckUpateCfg();
*/
}
return NULL;
}
int32_t mpCreateMgmtThread() {
int32_t code = TSDB_CODE_SUCCESS;
TdThreadAttr thAttr;
MP_ERR_RET(taosThreadAttrInit(&thAttr));
MP_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
code = taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, mpMgmtThreadFunc, NULL);
if (code != 0) {
uError("failed to create memPool mgmt thread, error: 0x%x", code);
(void)taosThreadAttrDestroy(&thAttr);
MP_ERR_JRET(code);
}
_return:
MP_ERR_RET(taosThreadAttrDestroy(&thAttr));
return code;
}
void mpModInit(void) {
int32_t code = TSDB_CODE_SUCCESS;
@ -1010,18 +1043,6 @@ void mpModInit(void) {
}
gMPMgmt.waitMs = MP_DEFAULT_MEM_CHK_INTERVAL_MS;
TdThreadAttr thAttr;
MP_ERR_JRET(taosThreadAttrInit(&thAttr));
MP_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
code = taosThreadCreate(&gMPMgmt.poolMgmtThread, &thAttr, mpMgmtThreadFunc, NULL);
if (code != 0) {
uError("failed to create memPool mgmt thread, error: 0x%x", code);
(void)taosThreadAttrDestroy(&thAttr);
MP_ERR_JRET(code);
}
MP_ERR_JRET(taosThreadAttrDestroy(&thAttr));
_return:
@ -1029,8 +1050,6 @@ _return:
}
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
char detailName[128];
@ -1045,26 +1064,24 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
mpPrintPosStat(&pSession->ctrl, &pSession->stat.posStat, detailName);
}
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
detailName[sizeof(detailName) - 1] = 0;
mpPrintSessionStat(&pPool->ctrl, &pPool->stat.statSession, detailName);
mpPrintStatDetail(&pPool->ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
if (NULL != pPool) {
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
detailName[sizeof(detailName) - 1] = 0;
mpPrintSessionStat(&pPool->ctrl, &pPool->stat.statSession, detailName);
mpPrintStatDetail(&pPool->ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
detailName[sizeof(detailName) - 1] = 0;
mpPrintNodeStat(&pPool->ctrl, pPool->stat.nodeStat, detailName);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos");
detailName[sizeof(detailName) - 1] = 0;
mpPrintPosStat(&pPool->ctrl, &pPool->stat.posStat, detailName);
MP_API_LEAVE();
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
detailName[sizeof(detailName) - 1] = 0;
mpPrintNodeStat(&pPool->ctrl, pPool->stat.nodeStat, detailName);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos");
detailName[sizeof(detailName) - 1] = 0;
mpPrintPosStat(&pPool->ctrl, &pPool->stat.posStat, detailName);
}
}
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = NULL;
@ -1104,29 +1121,21 @@ _return:
*poolHandle = pPool;
MP_API_LEAVE();
return code;
}
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
(void)mpUpdateCfg(pPool);
MP_API_LEAVE();
}
void taosMemPoolDestroySession(void* poolHandle, void* session) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
if (NULL == pSession) {
uWarn("null pointer of session");
goto _return;
if (NULL == poolHandle || NULL == pSession) {
uWarn("null pointer of poolHandle %p or session %p", poolHandle, session);
return;
}
(void)atomic_sub_fetch_32(&pSession->pJob->remainSession, 1);
@ -1135,22 +1144,14 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) {
(void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1);
taosMemPoolPrintStat(pPool, pSession, "DestroySession");
mpCheckStatDetail(pPool, pSession, "DestroySession");
TAOS_MEMSET(pSession, 0, sizeof(*pSession));
mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession);
_return:
MP_API_LEAVE();
}
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = NULL;
@ -1180,44 +1181,42 @@ _return:
*ppSession = pSession;
MP_API_LEAVE();
return code;
}
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
if (NULL == poolHandle || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64, __FUNCTION__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
terrno = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
_return:
MP_API_LEAVE();
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
}
return input.pMem;
}
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
int64_t totalSize = num * size;
SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
if (NULL == poolHandle || NULL == session || NULL == fileName || num < 0 || size < 0) {
if (NULL == poolHandle || NULL == fileName || num < 0 || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, num:%" PRId64 ", size:%" PRId64,
__FUNCTION__, poolHandle, session, fileName, num, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
@ -1225,27 +1224,26 @@ void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
int64_t totalSize = num * size;
SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
terrno = mpCalloc(pPool, pSession, &input.size, &input.pMem);
code = mpCalloc(pPool, pSession, &input.size, &input.pMem);
MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_CALLOC, &input);
_return:
MP_API_LEAVE();
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
}
return input.pMem;
}
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0, .pMem = ptr, .pOrigMem = ptr};
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
if (NULL == poolHandle || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%" PRId64,
__FUNCTION__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
@ -1253,9 +1251,8 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0, .pMem = ptr, .pOrigMem = ptr};
terrno = mpRealloc(pPool, pSession, &input.pMem, &input.size, &input.origSize);
code = mpRealloc(pPool, pSession, &input.pMem, &input.size, &input.origSize);
if (NULL != input.pMem) {
MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
@ -1278,17 +1275,19 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t siz
_return:
MP_API_LEAVE();
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
}
return input.pMem;
}
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
int64_t size = (ptr ? strlen(ptr) : 0) + 1;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr) {
if (NULL == poolHandle || NULL == fileName || NULL == ptr) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p",
__FUNCTION__, poolHandle, session, fileName, ptr);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
@ -1296,10 +1295,8 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char*
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
int64_t size = strlen(ptr) + 1;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
terrno = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
if (NULL != input.pMem) {
TAOS_STRCPY(input.pMem, ptr);
*((char*)input.pMem + size - 1) = 0;
@ -1310,17 +1307,20 @@ char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char*
_return:
MP_API_LEAVE();
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
}
return input.pMem;
}
char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
int64_t origSize = ptr ? strlen(ptr) : 0;
size = TMIN(size, origSize) + 1;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr || size < 0) {
if (NULL == poolHandle || NULL == fileName || NULL == ptr || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p, size:%" PRId64,
__FUNCTION__, poolHandle, session, fileName, ptr, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
@ -1328,11 +1328,8 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
int64_t origSize = strlen(ptr);
size = TMIN(size, origSize) + 1;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
terrno = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
code = mpMalloc(pPool, pSession, &input.size, 0, &input.pMem);
if (NULL != input.pMem) {
TAOS_MEMCPY(input.pMem, ptr, size - 1);
*((char*)input.pMem + size - 1) = 0;
@ -1343,24 +1340,23 @@ char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64
_return:
MP_API_LEAVE();
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
}
return input.pMem;
}
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
MP_API_ENTER();
if (NULL == ptr) {
goto _return;
return;
}
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName) {
if (NULL == poolHandle || NULL == fileName) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p",
__FUNCTION__, poolHandle, session, fileName);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1371,45 +1367,31 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName,
MP_SET_FLAG(input.procFlags, MP_STAT_PROC_FLAG_RES_SUCC);
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_FREE, &input);
_return:
MP_API_LEAVE();
return;
}
int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int64_t code = 0;
if (NULL == poolHandle || NULL == session || NULL == fileName) {
if (NULL == poolHandle || NULL == fileName) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p",
__FUNCTION__, poolHandle, session, fileName);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
if (NULL == ptr) {
goto _return;
return 0;
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
return mpGetMemorySizeImpl(pPool, pSession, ptr);
_return:
MP_API_LEAVE();
return code;
}
void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) {
if (NULL == poolHandle || NULL == fileName || size < 0 || alignment < POINTER_BYTES || alignment % POINTER_BYTES) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, alignment:%u, size:%" PRId64,
__FUNCTION__, poolHandle, session, fileName, alignment, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
@ -1417,51 +1399,42 @@ void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
terrno = mpMalloc(pPool, pSession, &input.size, alignment, &input.pMem);
code = mpMalloc(pPool, pSession, &input.size, alignment, &input.pMem);
MP_SET_FLAG(input.procFlags, (NULL != input.pMem ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
_return:
MP_API_LEAVE();
if (TSDB_CODE_SUCCESS != code) {
terrno = code;
}
return input.pMem;
}
void taosMemPoolClose(void* poolHandle) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
taosMemPoolPrintStat(poolHandle, NULL, "PoolClose");
mpCheckStatDetail(pPool, NULL, "PoolClose");
taosMemoryFree(pPool->name);
mpDestroyCacheGroup(&pPool->sessionCache);
MP_API_LEAVE();
}
void taosMemPoolModDestroy(void) {
MP_API_ENTER();
MP_API_LEAVE();
}
int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) {
if (NULL == poolHandle || NULL == fileName || size < 0) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%d",
__FUNCTION__, poolHandle, session, fileName, size);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMemPool* pPool = (SMemPool*)poolHandle;
@ -1475,89 +1448,51 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil
MP_SET_FLAG(input.procFlags, ((0 == code) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_TRIM, &input);
_return:
MP_API_LEAVE();
return code;
}
int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
*ppJob = taosMemoryCalloc(1, sizeof(SMPJob));
if (NULL == *ppJob) {
uError("calloc mp job failed, code: 0x%x", terrno);
MP_ERR_JRET(terrno);
MP_ERR_RET(terrno);
}
SMPJob* pJob = (SMPJob*)*ppJob;
pJob->job.jobId = jobId;
_return:
MP_API_LEAVE();
return code;
}
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) {
MP_API_ENTER();
if (NULL == poolHandle) {
*usedSize = 0;
*needEnd = false;
goto _return;
return;
}
SMemPool* pPool = (SMemPool*)poolHandle;
#if 0
if ((atomic_load_64(&pPool->cfg.maxSize) - atomic_load_64(&pPool->allocMemSize)) <= MP_CFG_UPDATE_MIN_RESERVE_SIZE) {
*needEnd = true;
taosWLockLatch(&pPool->cfgLock);
} else {
*needEnd = false;
}
*usedSize = atomic_load_64(&pPool->allocMemSize);
#else
taosWLockLatch(&pPool->cfgLock);
*needEnd = true;
*usedSize = atomic_load_64(&pPool->allocMemSize);
#endif
_return:
MP_API_LEAVE();
}
void taosMemPoolGetUsedSizeEnd(void* poolHandle) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
if (NULL == pPool) {
return;
}
taosWUnLockLatch(&pPool->cfgLock);
MP_API_LEAVE();
}
bool taosMemPoolNeedRetireJob(void* poolHandle) {
MP_API_ENTER();
SMemPool* pPool = (SMemPool*)poolHandle;
MP_API_LEAVE();
return atomic_load_64(&pPool->allocMemSize) >= atomic_load_64(&pPool->retireThreshold[0]);
}
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) {
MP_API_ENTER();
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == session || (NULL == ppStat && NULL == allocSize && NULL == maxAllocSize)) {
uError("%s invalid input param, session:%p, ppStat:%p, allocSize:%p, maxAllocSize:%p", __FUNCTION__, session, ppStat, allocSize, maxAllocSize);
MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
MP_ERR_RET(TSDB_CODE_INVALID_MEM_POOL_PARAM);
}
SMPSession* pSession = (SMPSession*)session;
@ -1571,17 +1506,77 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t
if (maxAllocSize) {
*maxAllocSize = atomic_load_64(&pSession->maxAllocMemSize);
}
_return:
MP_API_LEAVE();
return code;
}
int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) {
int32_t code = TSDB_CODE_SUCCESS;
#ifdef LINUX
if (!tsQueryUseMemoryPool) {
#endif
uInfo("memory pool disabled cause of configured disabled");
return code;
#ifdef LINUX
}
#endif
taosGetTotalMemory(&tsTotalMemoryKB);
if (tsTotalMemoryKB <= 0) {
uInfo("memory pool disabled since no enough system total memory, size: %" PRId64 "KB", tsTotalMemoryKB);
return code;
}
uInfo("total memory size: %" PRId64 "KB", tsTotalMemoryKB);
tsMinReservedMemorySize = TMAX(MIN_RESERVE_MEM_SIZE, tsTotalMemoryKB / 1024 * MP_DEFAULT_RESERVE_MEM_PERCENT / 100);
SMemPoolCfg cfg = {0};
int64_t sysAvailSize = 0;
code = taosGetSysAvailMemory(&sysAvailSize);
if (code || sysAvailSize < MP_MIN_MEM_POOL_SIZE) {
uInfo("memory pool disabled since no enough system available memory, size: %" PRId64, sysAvailSize);
code = TSDB_CODE_SUCCESS;
return code;
}
cfg.reserveSize = &tsMinReservedMemorySize;
int64_t freeSizeAfterRes = sysAvailSize - tsMinReservedMemorySize * 1048576UL;
if (freeSizeAfterRes < MP_MIN_FREE_SIZE_AFTER_RESERVE) {
uInfo("memory pool disabled since no enough system available memory after reservied, size: %" PRId64, freeSizeAfterRes);
return code;
}
atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize);
cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.chunkSize = 1048576;
cfg.jobQuota = &tsSingleQueryMaxMemorySize;
cfg.cb.failFp = failFp;
cfg.cb.reachFp = reachFp;
code = taosMemPoolOpen("taosd", &cfg, &gMemPoolHandle);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = mpCreateMgmtThread();
if (TSDB_CODE_SUCCESS != code) {
return code;
}
uInfo("memory pool initialized");
return code;
}
void taosAutoMemoryFree(void *ptr) {
if (NULL != threadPoolHandle) {
taosMemPoolFree(threadPoolHandle, threadPoolSession, ptr, __FILE__, __LINE__);
if (NULL != gMemPoolHandle) {
taosMemPoolFree(gMemPoolHandle, threadPoolSession, ptr, __FILE__, __LINE__);
} else {
taosMemFree(ptr);
}

View File

@ -156,8 +156,8 @@ typedef struct {
typedef struct {
int64_t jobQuota;
bool autoPoolSize;
int64_t poolSize;
bool reserveMode;
int64_t upperLimitSize;
int32_t threadNum;
int32_t randTask;
} SMPTestParam;
@ -219,7 +219,7 @@ void mptInitLogFile() {
tsLogKeepDays = 10;
TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum, false) < 0) {
MPT_PRINTF("failed to open log file in directory:%s\n", tsLogDir);
}
}
@ -254,7 +254,7 @@ void mptInit() {
mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, mptDeleteJobQueueData, NULL);
ASSERT_TRUE(NULL != mptCtx.pJobQueue);
mptCtx.jobCtxs = (SMPTestJobCtx*)taosMemCalloc(MPT_MAX_JOB_NUM, sizeof(*mptCtx.jobCtxs));
mptCtx.jobCtxs = (SMPTestJobCtx*)taosMemoryCalloc(MPT_MAX_JOB_NUM, sizeof(*mptCtx.jobCtxs));
ASSERT_TRUE(NULL != mptCtx.jobCtxs);
mptCtrl.pSrcString = (char*)taosMemoryMalloc(mptCtrl.maxSingleAllocSize);
@ -370,10 +370,10 @@ void mptInitTask(int32_t idx, int32_t eId, SMPTestJobCtx* pJob) {
ASSERT_TRUE(0 == mptInitSession(pJob->jobId, pJob->taskCtxs[idx].taskId, eId, pJob, &pJob->pSessions[idx]));
pJob->taskCtxs[idx].pMemList = (SMPTestMemInfo*)taosMemCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].pMemList));
pJob->taskCtxs[idx].pMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].pMemList));
ASSERT_TRUE(NULL != pJob->taskCtxs[idx].pMemList);
pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList));
pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList));
ASSERT_TRUE(NULL != pJob->taskCtxs[idx].npMemList);
uDebug("JOB:0x%x TASK:0x%x idx:%d initialized", pJob->jobId, pJob->taskCtxs[idx].taskId, idx);
@ -487,108 +487,19 @@ int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
return TSDB_CODE_SUCCESS;
}
int32_t mptGetQueryMemPoolMaxSize(void* pHandle, int64_t* pMaxSize, bool* autoMaxSize) {
if (!mptCtx.param.autoPoolSize && mptCtx.param.poolSize > 0) {
*pMaxSize = mptCtx.param.poolSize * 1048576UL;
*autoMaxSize = false;
return TSDB_CODE_SUCCESS;
}
int32_t code = mptGetMemPoolMaxMemSize(pHandle, pMaxSize);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
*autoMaxSize = true;
return code;
}
void mptCheckUpateCfgCb(void* pHandle, void* cfg) {
SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg;
int64_t newJobQuota = mptCtx.param.jobQuota * 1048576UL;
if (pCfg->jobQuota != newJobQuota) {
atomic_store_64(&pCfg->jobQuota, newJobQuota);
}
int64_t freeSize = 0;
bool autoMaxSize = false;
int32_t code = mptGetQueryMemPoolMaxSize(pHandle, &freeSize, &autoMaxSize);
if (TSDB_CODE_SUCCESS != code) {
pCfg->freeSize = 0;
uError("get query memPool freeSize failed, reset freeSize to %" PRId64, pCfg->freeSize);
return;
}
if (pCfg->autoMaxSize != autoMaxSize || pCfg->freeSize != freeSize) {
pCfg->autoMaxSize = autoMaxSize;
atomic_store_64(&pCfg->freeSize, freeSize);
taosMemPoolCfgUpdate(pHandle, pCfg);
}
}
void mptLowLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) {
void mptRetireJobsCb(void* pHandle, int64_t retireSize, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
while (pJob) {
if (!taosMemPoolNeedRetireJob(pHandle)) {
taosHashCancelIterate(mptCtx.pJobs, pJob);
return;
}
uint64_t jobId = pJob->memInfo->jobId;
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
mptRetireJob(pJob);
uDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
jobId, aSize, retireSize);
taosHashCancelIterate(mptCtx.pJobs, pJob);
break;
}
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob);
}
}
void mptMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
PriorityQueueNode qNode;
while (NULL != pJob) {
if (0 == atomic_load_8(&pJob->retired)) {
assert(pJob == taosHashAcquire(mptCtx.pJobs, &pJob->memInfo->jobId, sizeof(pJob->memInfo->jobId)));
qNode.data = pJob;
if (NULL == taosBQPush(mptCtx.pJobQueue, &qNode)) {
taosHashRelease(mptCtx.pJobs, pJob);
}
}
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob);
}
PriorityQueueNode* pNode = NULL;
uint64_t jobId = 0;
int64_t retiredSize = 0;
while (retiredSize < retireSize) {
if (!taosMemPoolNeedRetireJob(pHandle)) {
break;
}
pNode = taosBQTop(mptCtx.pJobQueue);
if (NULL == pNode) {
break;
}
pJob = (SMPTJobInfo*)pNode->data;
while (retiredSize < retireSize && NULL != pJob) {
if (atomic_load_8(&pJob->retired)) {
taosBQPop(mptCtx.pJobQueue);
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
continue;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
uint64_t jobId = pJob->memInfo->jobId;
jobId = pJob->memInfo->jobId;
mptRetireJob(pJob);
@ -598,15 +509,8 @@ void mptMidLevelRetire(void* pHandle, int64_t retireSize, int32_t errCode) {
jobId, aSize, retireSize, retiredSize);
}
taosBQPop(mptCtx.pJobQueue);
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
}
taosBQClear(mptCtx.pJobQueue);
}
void mptRetireJobsCb(void* pHandle, int64_t retireSize, bool lowLevelRetire, int32_t errCode) {
(lowLevelRetire) ? mptLowLevelRetire(pHandle, retireSize, errCode) : mptMidLevelRetire(pHandle, retireSize, errCode);
}
@ -627,13 +531,13 @@ void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) {
void mptInitPool(void) {
SMemPoolCfg cfg = {0};
cfg.autoMaxSize = mptCtx.param.autoPoolSize;
if (!mptCtx.param.autoPoolSize) {
cfg.freeSize = mptCtx.param.poolSize;
cfg.reserveMode = mptCtx.param.reserveMode;
if (!mptCtx.param.reserveMode) {
cfg.upperLimitSize = mptCtx.param.upperLimitSize;
} else {
int64_t memSize = 0;
ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize));
cfg.freeSize = memSize * 0.8;
cfg.reserveSize = memSize / 1048576UL * MP_DEFAULT_RESERVE_MEM_PERCENT / 100;
}
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
@ -641,7 +545,6 @@ void mptInitPool(void) {
cfg.jobQuota = mptCtx.param.jobQuota;
cfg.cb.retireJobsFp = mptRetireJobsCb;
cfg.cb.retireJobFp = mptRetireJobCb;
cfg.cb.cfgUpdateFp = mptCheckUpateCfgCb;
ASSERT_TRUE(0 == taosMemPoolOpen("testQMemPool", &cfg, &mptCtx.memPoolHandle));
}
@ -819,20 +722,20 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
mptCtrl.pSrcString[size] = 'W';
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.times.strdup.exec++;
pTask->stat.bytes.strdup.exec+=size + 1;
pTask->stat.times.strdup.fail++;
pTask->stat.bytes.strdup.fail+=size + 1;
pTask->stat.times.memStrdup.exec++;
pTask->stat.bytes.memStrdup.exec+=size + 1;
pTask->stat.times.memStrdup.fail++;
pTask->stat.bytes.memStrdup.fail+=size + 1;
uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p);
pTask->stat.times.strdup.exec++;
pTask->stat.bytes.strdup.exec+= nsize;
pTask->stat.times.memStrdup.exec++;
pTask->stat.bytes.memStrdup.exec+= nsize;
pTask->stat.times.strdup.succ++;
pTask->stat.bytes.strdup.succ+=nsize;
pTask->stat.times.memStrdup.succ++;
pTask->stat.bytes.memStrdup.succ+=nsize;
mptWriteMem(pTask->pMemList[pTask->memIdx].p, size);
@ -851,10 +754,10 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtrl.pSrcString, size);
if (NULL == pTask->pMemList[pTask->memIdx].p) {
pTask->stat.times.strndup.exec++;
pTask->stat.bytes.strndup.exec+=size + 1;
pTask->stat.times.strndup.fail++;
pTask->stat.bytes.strndup.fail+=size + 1;
pTask->stat.times.memStrndup.exec++;
pTask->stat.bytes.memStrndup.exec+=size + 1;
pTask->stat.times.memStrndup.fail++;
pTask->stat.bytes.memStrndup.fail+=size + 1;
uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
return;
}
@ -862,10 +765,10 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
assert(strlen((char*)pTask->pMemList[pTask->memIdx].p) == size);
nsize = mptMemorySize(pTask->pMemList[pTask->memIdx].p);
pTask->stat.times.strndup.exec++;
pTask->stat.bytes.strndup.exec+=nsize;
pTask->stat.times.strndup.succ++;
pTask->stat.bytes.strndup.succ+=nsize;
pTask->stat.times.memStrndup.exec++;
pTask->stat.bytes.memStrndup.exec+=nsize;
pTask->stat.times.memStrndup.succ++;
pTask->stat.bytes.memStrndup.succ+=nsize;
mptWriteMem(pTask->pMemList[pTask->memIdx].p, size);
@ -1152,8 +1055,8 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) {
MPT_PRINTF("\t task max act times: %d\n", mptCtrl.taskActTimes ? mptCtrl.taskActTimes : MPT_MAX_MEM_ACT_TIMES);
MPT_PRINTF("\t max single alloc size: %" PRId64 "\n", mptCtrl.maxSingleAllocSize);
MPT_PRINTF("\t job quota size: %" PRId64 "\n", param->jobQuota);
MPT_PRINTF("\t auto pool size: %d\n", param->autoPoolSize);
MPT_PRINTF("\t pool max size: %" PRId64 "\n", param->poolSize);
MPT_PRINTF("\t reserve mode: %d\n", param->reserveMode);
MPT_PRINTF("\t upper limit size: %" PRId64 "\n", param->upperLimitSize);
MPT_PRINTF("\t test thread num: %d\n", param->threadNum);
MPT_PRINTF("\t random exec task: %d\n", param->randTask);
}
@ -1181,6 +1084,7 @@ TEST(FuncTest, SysMemoryPerfTest) {
for (int32_t i = 0; i < loopTimes; ++i) {
code = taosGetSysAvailMemory(&freeSize);
assert(0 == code);
taosMsleep(1);
}
totalUs = taosGetTimestampUs() - st;
@ -1188,11 +1092,11 @@ TEST(FuncTest, SysMemoryPerfTest) {
}
#endif
#if 0
#if 1
TEST(FuncTest, SingleThreadTest) {
char* caseName = "FuncTest:SingleThreadTest";
SMPTestParam param = {0};
param.autoPoolSize = true;
param.reserveMode = true;
param.threadNum = 1;
mptPrintTestBeginInfo(caseName, &param);