enh: add query memory quota
This commit is contained in:
parent
d34112983f
commit
75d24720e7
|
@ -72,6 +72,7 @@ extern int32_t tsTagFilterResCacheSize;
|
|||
extern int32_t tsQueryMinConcurrentTaskNum;
|
||||
extern int32_t tsQueryMaxConcurrentTaskNum;
|
||||
extern int32_t tsQueryConcurrentTaskNum;
|
||||
extern int64_t tsSingleQueryMaxMemorySize;
|
||||
extern int32_t tsNumOfQueryThreads;
|
||||
extern int32_t tsNumOfRpcThreads;
|
||||
extern int32_t tsNumOfRpcSessions;
|
||||
|
|
|
@ -42,7 +42,7 @@ typedef enum MemPoolUsageLevel {
|
|||
typedef void (*decConcSessionNum)(void);
|
||||
typedef void (*incConcSessionNum)(void);
|
||||
typedef void (*setConcSessionNum)(int32_t);
|
||||
typedef void (*retireCollection)(uint64_t, int64_t);
|
||||
typedef bool (*retireCollection)(uint64_t, int64_t);
|
||||
|
||||
typedef struct SMemPoolCallBack {
|
||||
decConcSessionNum decSessFp;
|
||||
|
@ -54,7 +54,7 @@ typedef struct SMemPoolCallBack {
|
|||
typedef struct SMemPoolCfg {
|
||||
int64_t maxSize;
|
||||
int64_t sessionExpectSize;
|
||||
int64_t *collectionQuota;
|
||||
int64_t collectionQuota;
|
||||
int32_t chunkSize;
|
||||
int32_t threadNum;
|
||||
int8_t usageLevel[E_MEM_USAGE_MAX_VALUE];
|
||||
|
@ -75,8 +75,9 @@ void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignme
|
|||
void taosMemPoolClose(void* poolHandle);
|
||||
void taosMemPoolModDestroy(void);
|
||||
void taosAutoMemoryFree(void *ptr);
|
||||
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession);
|
||||
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollection);
|
||||
void taosMemPoolDestroySession(void* poolHandle, void* session);
|
||||
int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection);
|
||||
|
||||
|
||||
extern threadlocal void* threadPoolHandle;
|
||||
|
|
|
@ -60,7 +60,7 @@ int8_t tsQueryUseMemoryPool = 1;
|
|||
int32_t tsQueryMinConcurrentTaskNum = 1;
|
||||
int32_t tsQueryMaxConcurrentTaskNum = 0;
|
||||
int32_t tsQueryConcurrentTaskNum = 0;
|
||||
int64_t tsSingleQueryMaxMemorySize = 0;
|
||||
int64_t tsSingleQueryMaxMemorySize = 1000000; //MB
|
||||
|
||||
int32_t tsNumOfRpcThreads = 1;
|
||||
int32_t tsNumOfRpcSessions = 30000;
|
||||
|
@ -678,6 +678,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt64(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 100, 1000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 4, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
@ -1142,6 +1144,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
|
||||
|
||||
tsQueryUseMemoryPool = (bool)cfgGetItem(pCfg, "queryUseMemoryPool")->bval;
|
||||
tsSingleQueryMaxMemorySize = cfgGetItem(pCfg, "singleQueryMaxMemorySize")->i64 * 1048576;
|
||||
|
||||
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||
tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32;
|
||||
|
|
|
@ -28,6 +28,7 @@ extern "C" {
|
|||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "theap.h"
|
||||
|
||||
#define QW_DEFAULT_SCHEDULER_NUMBER 100
|
||||
#define QW_DEFAULT_TASK_NUMBER 10000
|
||||
|
@ -232,10 +233,17 @@ typedef struct SQWorkerMgmt {
|
|||
} SQWorkerMgmt;
|
||||
|
||||
typedef struct SQWQueryInfo {
|
||||
bool retired;
|
||||
void* pCollection;
|
||||
SHashObj* pSessions;
|
||||
} SQWQueryInfo;
|
||||
|
||||
typedef struct SQWRetireCtx {
|
||||
int8_t retireBegin;
|
||||
TdThreadCond retired;
|
||||
BoundedQueue* collectionQueue;
|
||||
} SQWRetireCtx;
|
||||
|
||||
typedef struct SQueryMgmt {
|
||||
SRWLatch taskMgmtLock;
|
||||
int32_t concTaskLevel;
|
||||
|
@ -452,6 +460,8 @@ void qwDbgSimulateSleep(void);
|
|||
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
|
||||
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
|
||||
int32_t qwInitQueryPool(void);
|
||||
void qwDestroyQueryInfo(SQWQueryInfo* pQuery);
|
||||
int32_t qwInitSession(uint64_t qId, void** ppSession);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -104,11 +104,16 @@ int32_t qwInitSession(uint64_t qId, void** ppSession) {
|
|||
break;
|
||||
}
|
||||
|
||||
QW_ERR_RET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, qId, ppSession, pQuery->pCollection));
|
||||
QW_ERR_RET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pQuery->pCollection));
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
bool qwRetireCollection(uint64_t collectionId, int64_t retireSize) {
|
||||
//TODO
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t qwInitQueryPool(void) {
|
||||
int64_t memSize = 0;
|
||||
int32_t code = taosGetSysAvailMemory(&memSize);
|
||||
|
@ -124,10 +129,11 @@ int32_t qwInitQueryPool(void) {
|
|||
|
||||
cfg.threadNum = 10; //TODO
|
||||
cfg.evicPolicy = E_EVICT_AUTO; //TODO
|
||||
cfg.collectionQuota = &tsSingleQueryMaxMemorySize;
|
||||
cfg.collectionQuota = tsSingleQueryMaxMemorySize * 1048576;
|
||||
cfg.cb.setSessFp = qwSetConcurrentTaskNum;
|
||||
cfg.cb.decSessFp = qwDecConcurrentTaskNum;
|
||||
cfg.cb.incSessFp = qwIncConcurrentTaskNum;
|
||||
cfg.cb.retireFp = qwRetireCollection;
|
||||
|
||||
code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
|
|
@ -1312,7 +1312,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
|
|||
memset(gQwMgmt.param, 0, sizeof(gQwMgmt.param));
|
||||
}
|
||||
|
||||
int32_t code = qwOpenRef();
|
||||
code = qwOpenRef();
|
||||
if (code) {
|
||||
atomic_sub_fetch_32(&gQwMgmt.qwNum, 1);
|
||||
QW_RET(code);
|
||||
|
|
|
@ -31,7 +31,10 @@ extern "C" {
|
|||
#define MP_MAX_KEEP_FREE_CHUNK_NUM 1000
|
||||
#define MP_MAX_MALLOC_MEM_SIZE 0xFFFFFFFFFF
|
||||
|
||||
#define MP_RETIRE_THRESHOLD_PERCENT (0.9)
|
||||
|
||||
#define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95)
|
||||
#define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9)
|
||||
#define MP_RETIRE_LOW_THRESHOLD_PERCENT (0.85)
|
||||
#define MP_RETIRE_UNIT_PERCENT (0.1)
|
||||
|
||||
|
||||
|
@ -187,10 +190,21 @@ typedef struct SMPStatInfo {
|
|||
SHashObj* lineStat;
|
||||
} SMPStatInfo;
|
||||
|
||||
|
||||
typedef struct SMPCollection {
|
||||
int64_t collectionId;
|
||||
int64_t allocMemSize;
|
||||
int64_t maxAllocMemSize;
|
||||
|
||||
SMPStatInfo stat;
|
||||
} SMPCollection;
|
||||
|
||||
|
||||
typedef struct SMPSession {
|
||||
SMPListNode list;
|
||||
|
||||
int64_t sessionId;
|
||||
|
||||
SMPCollection* pCollection;
|
||||
bool needRetire;
|
||||
SMPCtrlInfo ctrlInfo;
|
||||
|
@ -230,19 +244,12 @@ typedef struct SMPCacheGroupInfo {
|
|||
void *pIdleList;
|
||||
} SMPCacheGroupInfo;
|
||||
|
||||
typedef struct SMPCollection {
|
||||
int64_t collectionId;
|
||||
int64_t allocMemSize;
|
||||
int64_t maxAllocMemSize;
|
||||
|
||||
SMPStatInfo stat;
|
||||
} SMPCollection;
|
||||
|
||||
typedef struct SMemPool {
|
||||
char *name;
|
||||
int16_t slotId;
|
||||
SMemPoolCfg cfg;
|
||||
int64_t memRetireThreshold;
|
||||
int64_t memRetireThreshold[3];
|
||||
int64_t memRetireUnit;
|
||||
int32_t maxChunkNum;
|
||||
SMPCtrlInfo ctrlInfo;
|
||||
|
|
|
@ -198,8 +198,10 @@ int32_t memPoolInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
|
|||
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pPool->memRetireThreshold = pPool->cfg.maxSize * MP_RETIRE_THRESHOLD_PERCENT;
|
||||
pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT;
|
||||
pPool->memRetireThreshold[0] = pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT;
|
||||
pPool->memRetireThreshold[1] = pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT;
|
||||
pPool->memRetireThreshold[2] = pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT;
|
||||
pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT;
|
||||
pPool->maxChunkNum = cfg->maxSize / cfg->chunkSize;
|
||||
if (pPool->maxChunkNum <= 0) {
|
||||
uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", cfg->maxSize, cfg->chunkSize);
|
||||
|
@ -278,6 +280,34 @@ int32_t memPoolGetChunkFromSession(SMemPool* pPool, SMPSession* pSession, int64_
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void memPoolUpdateMaxAllocMemSize(int64_t* pMaxAllocMemSize, int64_t newSize) {
|
||||
int64_t maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
|
||||
while (true) {
|
||||
if (newSize <= maxAllocMemSize) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (maxAllocMemSize == atomic_val_compare_exchange_64(pMaxAllocMemSize, maxAllocMemSize, newSize)) {
|
||||
break;
|
||||
}
|
||||
|
||||
maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
|
||||
}
|
||||
}
|
||||
|
||||
void memPoolUpdateAllocMemSize(SMemPool* pPool, SMPSession* pSession, int64_t size) {
|
||||
int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
|
||||
memPoolUpdateMaxAllocMemSize(&pSession->maxAllocMemSize, allocMemSize);
|
||||
|
||||
allocMemSize = atomic_add_fetch_64(&pSession->pCollection->allocMemSize, size);
|
||||
memPoolUpdateMaxAllocMemSize(&pSession->pCollection->maxAllocMemSize, allocMemSize);
|
||||
|
||||
allocMemSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
|
||||
memPoolUpdateMaxAllocMemSize(&pPool->maxAllocMemSize, allocMemSize);
|
||||
}
|
||||
|
||||
|
||||
void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SMPChunk* pChunk = NULL, *preSrcChunk = NULL;
|
||||
|
@ -293,7 +323,7 @@ void* memPoolAllocFromChunk(SMemPool* pPool, SMPSession* pSession, int64_t size,
|
|||
|
||||
pSession->allocChunkNum++;
|
||||
pSession->allocChunkMemSize += pPool->cfg.chunkSize;
|
||||
memPoolAddSessionAllocMemSize(pSession, totalSize);
|
||||
memPoolUpdateAllocMemSize(pPool, pSession, totalSize);
|
||||
|
||||
MP_ADD_TO_CHUNK_LIST(pSession->srcChunkHead, pSession->srcChunkTail, pSession->srcChunkNum, pChunk);
|
||||
MP_ADD_TO_CHUNK_LIST(pSession->inUseChunkHead, pSession->inUseChunkTail, pSession->inUseChunkNum, pChunk);
|
||||
|
@ -336,7 +366,7 @@ void* memPoolAllocFromNSChunk(SMemPool* pPool, SMPSession* pSession, int64_t siz
|
|||
|
||||
pSession->allocChunkNum++;
|
||||
pSession->allocChunkMemSize += totalSize;
|
||||
memPoolAddSessionAllocMemSize(pSession, totalSize);
|
||||
memPoolUpdateAllocMemSize(pPool, pSession, totalSize);
|
||||
|
||||
if (NULL == pSession->inUseNSChunkHead) {
|
||||
pSession->inUseNSChunkHead = pChunk;
|
||||
|
@ -350,37 +380,12 @@ _return:
|
|||
return pRes;
|
||||
}
|
||||
|
||||
void memPoolUpdateMaxAllocMemSize(int64_t* pMaxAllocMemSize, int64_t newSize) {
|
||||
int64_t maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
|
||||
while (true) {
|
||||
if (newSize <= maxAllocMemSize) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (maxAllocMemSize == atomic_val_compare_exchange_64(pMaxAllocMemSize, maxAllocMemSize, newSize)) {
|
||||
break;
|
||||
}
|
||||
|
||||
maxAllocMemSize = atomic_load_64(pMaxAllocMemSize);
|
||||
}
|
||||
}
|
||||
|
||||
void memPoolUpdateAllocMemSize(SMemPool* pPool, SMPSession* pSession, int64_t size) {
|
||||
int64_t allocMemSize = atomic_add_fetch_64(&pSession->allocMemSize, size);
|
||||
memPoolUpdateMaxAllocMemSize(&pSession->maxAllocMemSize, allocMemSize);
|
||||
|
||||
allocMemSize = atomic_add_fetch_64(&pSession->pCollection->allocMemSize, size);
|
||||
memPoolUpdateMaxAllocMemSize(&pSession->pCollection->maxAllocMemSize, allocMemSize);
|
||||
|
||||
allocMemSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
|
||||
memPoolUpdateMaxAllocMemSize(&pPool->maxAllocMemSize, allocMemSize);
|
||||
}
|
||||
|
||||
bool memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) {
|
||||
if (pPool->cfg.collectionQuota > 0 && (atomic_load_64(&pSession->pCollection->allocMemSize) + size) > pPool->cfg.collectionQuota) {
|
||||
if (pPool->cfg.collectionQuota > 0 && (atomic_load_64(&pSession->pCollection->allocMemSize) + size) > atomic_load_64(&pPool->cfg.collectionQuota)) {
|
||||
return true;
|
||||
}
|
||||
if ((atomic_load_64(&pPool->allocMemSize) + size) >= pPool->memRetireThreshold) {
|
||||
if ((atomic_load_64(&pPool->allocMemSize) + size) >= pPool->memRetireThreshold[1]) {
|
||||
return (*pPool->cfg.cb.retireFp)(pSession->pCollection->collectionId, pPool->memRetireUnit);
|
||||
}
|
||||
|
||||
|
@ -502,6 +507,11 @@ void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64
|
|||
|
||||
switch (gMPMgmt.strategy) {
|
||||
case E_MP_STRATEGY_DIRECT: {
|
||||
if (memPoolMemQuotaOverflow(pPool, pSession, size)) {
|
||||
uInfo("session needs to retire memory");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
res = taosMemRealloc(ptr, size);
|
||||
if (NULL != res) {
|
||||
memPoolUpdateAllocMemSize(pPool, pSession, size - *origSize);
|
||||
|
@ -531,12 +541,14 @@ void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName) {
|
||||
void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName, int64_t maxAllocSize) {
|
||||
if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
|
||||
return;
|
||||
}
|
||||
|
||||
uInfo("MemPool [%s] stat detail:", detailName);
|
||||
|
||||
uInfo("Max Used Memory Size: %" PRId64, maxAllocSize);
|
||||
|
||||
uInfo("[times]:");
|
||||
uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("memMalloc", pDetail->times.memMalloc));
|
||||
|
@ -587,7 +599,7 @@ void memPoolPrintStat(SMemPool* pPool, SMPSession* pSession, char* procName) {
|
|||
if (NULL != pSession) {
|
||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session");
|
||||
detailName[sizeof(detailName) - 1] = 0;
|
||||
memPoolPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName);
|
||||
memPoolPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize);
|
||||
|
||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFile");
|
||||
detailName[sizeof(detailName) - 1] = 0;
|
||||
|
@ -601,7 +613,7 @@ void memPoolPrintStat(SMemPool* pPool, SMPSession* pSession, char* procName) {
|
|||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
|
||||
detailName[sizeof(detailName) - 1] = 0;
|
||||
memPoolPrintSessionStat(&pPool->ctrlInfo, &pPool->stat.statSession, detailName);
|
||||
memPoolPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName);
|
||||
memPoolPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
|
||||
|
||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
|
||||
detailName[sizeof(detailName) - 1] = 0;
|
||||
|
@ -1040,7 +1052,7 @@ void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil
|
|||
}
|
||||
|
||||
int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection) {
|
||||
*ppCollection = taosMemCalloc(1, sizeof(SMPCollection))
|
||||
*ppCollection = taosMemCalloc(1, sizeof(SMPCollection));
|
||||
if (NULL == *ppCollection) {
|
||||
uError("calloc collection failed");
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
Loading…
Reference in New Issue