enh: optimize source code

This commit is contained in:
dapan1121 2024-07-31 19:28:17 +08:00
parent 08ec404200
commit 07034ce28c
12 changed files with 904 additions and 637 deletions

View File

@ -31,27 +31,26 @@ typedef enum MemPoolEvictPolicy {
E_EVICT_MAX_VALUE, // no used E_EVICT_MAX_VALUE, // no used
} MemPoolEvictPolicy; } MemPoolEvictPolicy;
typedef enum MemPoolUsageLevel { typedef struct SMemPoolJob {
E_MEM_USAGE_LOW = 0, uint64_t jobId;
E_MEM_USAGE_MIDDLE, int64_t allocMemSize;
E_MEM_USAGE_HIGH, int64_t maxAllocMemSize;
E_MEM_USAGE_EXTRAME, } SMemPoolJob;
E_MEM_USAGE_MAX_VALUE
} MemPoolUsageLevel;
typedef void (*mpDecConcSessionNum)(void); typedef void (*mpDecConcSessionNum)(void);
typedef void (*mpIncConcSessionNum)(void); typedef void (*mpIncConcSessionNum)(void);
typedef void (*mpSetConcSessionNum)(int32_t); typedef void (*mpSetConcSessionNum)(int32_t);
typedef void (*mpRetireCollections)(int64_t, bool); typedef void (*mpRetireJobs)(int64_t, bool, int32_t);
typedef void (*mpRetireCollection)(uint64_t); typedef void (*mpRetireJob)(SMemPoolJob*, int32_t);
typedef void (*mpCfgUpdate)(void*, void*); typedef void (*mpCfgUpdate)(void*, void*);
typedef struct SMemPoolCallBack { typedef struct SMemPoolCallBack {
mpDecConcSessionNum decSessFp; mpDecConcSessionNum decSessFp;
mpIncConcSessionNum incSessFp; mpIncConcSessionNum incSessFp;
mpSetConcSessionNum setSessFp; mpSetConcSessionNum setSessFp;
mpRetireCollections retiresFp; mpRetireJobs retireJobsFp;
mpRetireCollection retireFp; mpRetireJob retireJobFp;
mpCfgUpdate cfgUpdateFp; mpCfgUpdate cfgUpdateFp;
} SMemPoolCallBack; } SMemPoolCallBack;
@ -59,24 +58,13 @@ typedef struct SMemPoolCallBack {
typedef struct SMemPoolCfg { typedef struct SMemPoolCfg {
bool autoMaxSize; bool autoMaxSize;
int64_t maxSize; int64_t maxSize;
int64_t sessionExpectSize; int64_t jobQuota;
int64_t collectionQuota;
int32_t chunkSize; int32_t chunkSize;
int32_t threadNum; int32_t threadNum;
int8_t usageLevel[E_MEM_USAGE_MAX_VALUE];
MemPoolEvictPolicy evicPolicy; MemPoolEvictPolicy evicPolicy;
SMemPoolCallBack cb; SMemPoolCallBack cb;
} SMemPoolCfg; } SMemPoolCfg;
typedef struct SMemPoolCollection {
uint64_t collectionId;
int64_t allocMemSize;
int64_t maxAllocMemSize;
} SMemPoolCollection;
void taosMemPoolModInit(void);
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle); int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle);
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo); void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo);
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo); void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo);
@ -89,9 +77,9 @@ void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignme
void taosMemPoolClose(void* poolHandle); void taosMemPoolClose(void* poolHandle);
void taosMemPoolModDestroy(void); void taosMemPoolModDestroy(void);
void taosAutoMemoryFree(void *ptr); void taosAutoMemoryFree(void *ptr);
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pCollection); int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob);
void taosMemPoolDestroySession(void* poolHandle, void* session); void taosMemPoolDestroySession(void* poolHandle, void* session);
int32_t taosMemPoolCallocCollection(uint64_t collectionId, void** ppCollection); int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob);
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg); void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg);
#define taosMemPoolFreeClear(ptr) \ #define taosMemPoolFreeClear(ptr) \

View File

@ -87,6 +87,8 @@ BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete delete
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn); void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn);
void taosBQClear(BoundedQueue* q);
void destroyBoundedQueue(BoundedQueue* q); void destroyBoundedQueue(BoundedQueue* q);
/* /*

View File

@ -672,7 +672,7 @@ static bool filterWindowWithLimit(SIntervalAggOperatorInfo* pOperatorInfo, STime
if (pOperatorInfo->limit == 0) return true; if (pOperatorInfo->limit == 0) return true;
if (pOperatorInfo->pBQ == NULL) { if (pOperatorInfo->pBQ == NULL) {
pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, NULL, pOperatorInfo); pOperatorInfo->pBQ = createBoundedQueue(pOperatorInfo->limit - 1, tsKeyCompFn, taosAutoMemoryFree, pOperatorInfo);
} }
bool shouldFilter = false; bool shouldFilter = false;

View File

@ -44,6 +44,7 @@ extern "C" {
#define QW_DEFAULT_RESERVE_MEM_PERCENT 20 #define QW_DEFAULT_RESERVE_MEM_PERCENT 20
#define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576UL) #define QW_MIN_RESERVE_MEM_SIZE (512 * 1048576UL)
#define QW_MIN_MEM_POOL_SIZE (1048576UL) #define QW_MIN_MEM_POOL_SIZE (1048576UL)
#define QW_MAX_RETIRE_JOB_NUM 10000
#define QW_DEFAULT_THREAD_TASK_NUM 3 #define QW_DEFAULT_THREAD_TASK_NUM 3
@ -129,6 +130,15 @@ typedef struct SQWTaskStatus {
int8_t status; int8_t status;
} SQWTaskStatus; } SQWTaskStatus;
typedef struct SQWJobInfo {
int8_t retired;
int32_t errCode;
SMemPoolJob* memInfo;
SHashObj* pSessions;
} SQWJobInfo;
typedef struct SQWTaskCtx { typedef struct SQWTaskCtx {
SRWLatch lock; SRWLatch lock;
int8_t phase; int8_t phase;
@ -166,6 +176,7 @@ typedef struct SQWTaskCtx {
SArray *tbInfo; // STbVerInfo SArray *tbInfo; // STbVerInfo
void *memPoolSession; void *memPoolSession;
SQWJobInfo *pJobInfo;
} SQWTaskCtx; } SQWTaskCtx;
typedef struct SQWSchStatus { typedef struct SQWSchStatus {
@ -232,31 +243,15 @@ typedef struct SQWorkerMgmt {
int32_t paramIdx; int32_t paramIdx;
} SQWorkerMgmt; } SQWorkerMgmt;
typedef struct SQWQueryInfo {
int8_t retired;
SMemPoolCollection* pCollection;
SHashObj* pSessions;
} SQWQueryInfo;
typedef struct SQWRetireLowCtx {
int8_t retireBegin;
} SQWRetireLowCtx;
typedef struct SQWRetireMidCtx {
int8_t retireBegin;
TdThreadCond retired;
BoundedQueue* collectionQueue;
} SQWRetireMidCtx;
typedef struct SQWRetireCtx { typedef struct SQWRetireCtx {
SQWRetireLowCtx lowCtx; BoundedQueue* pJobQueue;
SQWRetireMidCtx midCtx;
} SQWRetireCtx; } SQWRetireCtx;
typedef struct SQueryMgmt { typedef struct SQueryMgmt {
SRWLatch taskMgmtLock; SRWLatch taskMgmtLock;
int32_t concTaskLevel; int32_t concTaskLevel;
SHashObj* pQueryInfo; SHashObj* pJobInfo;
void* memPoolHandle; void* memPoolHandle;
SQWRetireCtx retireCtx; SQWRetireCtx retireCtx;
} SQueryMgmt; } SQueryMgmt;
@ -482,8 +477,10 @@ void qwDbgSimulateSleep(void);
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
int32_t qwInitQueryPool(void); int32_t qwInitQueryPool(void);
void qwDestroyQueryInfo(SQWQueryInfo* pQuery); void qwDestroyJobInfo(SQWJobInfo* pJob);
int32_t qwInitSession(QW_FPARAMS_DEF, void** ppSession); void qwRetireJob(SQWJobInfo* pJob);
int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -15,6 +15,8 @@ int32_t qwGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
} }
int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chunkSize) { int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chunkSize) {
//TODO
*chunkSize = 2 * 1048576; *chunkSize = 2 * 1048576;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -53,101 +55,159 @@ void qwIncConcurrentTaskNumCb(void) {
//TODO //TODO
} }
int32_t qwInitQueryInfo(uint64_t qId, SQWQueryInfo* pQuery) { int32_t qwInitJobInfo(uint64_t qId, SQWJobInfo* pJob) {
pQuery->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pQuery->pSessions) { if (NULL == pJob->pSessions) {
qError("fail to init session hash"); qError("fail to init session hash, code: 0x%x", terrno);
return TSDB_CODE_OUT_OF_MEMORY; return terrno;
} }
int32_t code = taosMemPoolCallocCollection(qId, (void**)&pQuery->pCollection); int32_t code = taosMemPoolCallocJob(qId, (void**)&pJob->memInfo);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pQuery->pSessions); taosHashCleanup(pJob->pSessions);
pJob->pSessions = NULL;
return code; return code;
} }
return code; return code;
} }
int32_t qwInitSession(QW_FPARAMS_DEF, void** ppSession) { int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SQWQueryInfo* pQuery = NULL; SQWJobInfo* pJob = NULL;
while (true) { while (true) {
pQuery = (SQWQueryInfo*)taosHashGet(gQueryMgmt.pQueryInfo, &qId, sizeof(qId)); pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId));
if (NULL == pQuery) { if (NULL == pJob) {
SQWQueryInfo queryInfo = {0}; SQWJobInfo jobInfo = {0};
code = qwInitQueryInfo(qId, &queryInfo); code = qwInitJobInfo(qId, &jobInfo);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
code = taosHashPut(gQueryMgmt.pQueryInfo, &qId, sizeof(qId), &queryInfo, sizeof(queryInfo)); code = taosHashPut(gQueryMgmt.pJobInfo, &qId, sizeof(qId), &jobInfo, sizeof(jobInfo));
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qwDestroyQueryInfo(&queryInfo); qwDestroyJobInfo(&jobInfo);
if (-2 == code) { if (TSDB_CODE_DUP_KEY == code) {
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
continue; continue;
} }
return TSDB_CODE_OUT_OF_MEMORY; return code;
} }
pQuery = (SQWQueryInfo*)taosHashGet(gQueryMgmt.pQueryInfo, &qId, sizeof(qId)); pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId);
return TSDB_CODE_QRY_JOB_NOT_EXIST;
}
} }
break; break;
} }
QW_ERR_RET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pQuery->pCollection)); ctx->pJobInfo = pJob;
QW_ERR_JRET(taosMemPoolInitSession(gQueryMgmt.memPoolHandle, ppSession, pJob->memInfo));
char id[sizeof(tId) + sizeof(eId)] = {0}; char id[sizeof(tId) + sizeof(eId)] = {0};
QW_SET_TEID(id, tId, eId); QW_SET_TEID(id, tId, eId);
code = taosHashPut(pQuery->pSessions, id, sizeof(id), ppSession, POINTER_BYTES); code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qError("fail to put session into query session hash, errno:%d", terrno); qError("fail to put session into query session hash, code: 0x%x", code);
return terrno; QW_ERR_JRET(code);
}
_return:
if (NULL != pJob) {
taosHashRelease(gQueryMgmt.pJobInfo, pJob);
} }
return code; return code;
} }
void qwRetireCollectionCb(uint64_t collectionId) { void qwRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, &mpJob->jobId, sizeof(mpJob->jobId));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId);
return;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
qwRetireJob(pJob);
qInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode);
} else {
qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode));
}
} }
void qwLowLevelRetire(int64_t retireSize) { void qwLowLevelRetire(int64_t retireSize, int32_t errCode) {
SQWQueryInfo* pQuery = (SQWQueryInfo*)taosHashIterate(gQueryMgmt.pQueryInfo, NULL); SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
while (pQuery) { while (pJob) {
int64_t aSize = atomic_load_64(&pQuery->pCollection->allocMemSize); int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
if (aSize >= retireSize) { if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
atomic_store_8(&pQuery->retired, 1); qwRetireJob(pJob);
//TODO RETIRE JOB/TASKS DIRECTLY
qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
pQuery->pCollection->collectionId, aSize, retireSize); pJob->memInfo->jobId, aSize, retireSize);
taosHashCancelIterate(gQueryMgmt.pQueryInfo, pQuery); taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob);
break; break;
} }
pQuery = (SQWQueryInfo*)taosHashIterate(gQueryMgmt.pQueryInfo, pQuery); pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
} }
} }
void qwMidLevelRetire(int64_t retireSize) { void qwMidLevelRetire(int64_t retireSize, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
PriorityQueueNode qNode;
while (NULL != pJob) {
if (0 == atomic_load_8(&pJob->retired)) {
qNode.data = pJob;
(void)taosBQPush(gQueryMgmt.retireCtx.pJobQueue, &qNode);
}
pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
}
PriorityQueueNode* pNode = NULL;
int64_t retiredSize = 0;
while (retiredSize < retireSize) {
pNode = taosBQTop(gQueryMgmt.retireCtx.pJobQueue);
if (NULL == pNode) {
break;
}
pJob = (SQWJobInfo*)pNode->data;
if (atomic_load_8(&pJob->retired)) {
taosBQPop(gQueryMgmt.retireCtx.pJobQueue);
continue;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
qwRetireJob(pJob);
qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
pJob->memInfo->jobId, aSize, retireSize);
retiredSize += aSize;
}
taosBQPop(gQueryMgmt.retireCtx.pJobQueue);
}
taosBQClear(gQueryMgmt.retireCtx.pJobQueue);
} }
void qwRetireCollectionsCb(int64_t retireSize, bool lowLevelRetire) { void qwRetireJobsCb(int64_t retireSize, bool lowLevelRetire, int32_t errCode) {
if (lowLevelRetire) { (lowLevelRetire) ? qwLowLevelRetire(retireSize, errCode) : qwMidLevelRetire(retireSize, errCode);
qwLowLevelRetire(retireSize);
} else {
qwMidLevelRetire(retireSize);
}
} }
int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
@ -161,8 +221,8 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
int64_t memSize = 0; int64_t memSize = 0;
int32_t code = taosGetSysAvailMemory(&memSize); int32_t code = taosGetSysAvailMemory(&memSize);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qError("get system avaiable memory size failed, errno: %d", errno); qError("get system avaiable memory size failed, error: 0x%x", code);
return TAOS_SYSTEM_ERROR(errno); return code;
} }
code = qwGetMemPoolMaxMemSize(memSize, pMaxSize); code = qwGetMemPoolMaxMemSize(memSize, pMaxSize);
@ -177,9 +237,9 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
void qwCheckUpateCfgCb(void* pHandle, void* cfg) { void qwCheckUpateCfgCb(void* pHandle, void* cfg) {
SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg; SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg;
int64_t newCollectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; int64_t newJobQuota = tsSingleQueryMaxMemorySize * 1048576UL;
if (pCfg->collectionQuota != newCollectionQuota) { if (pCfg->jobQuota != newJobQuota) {
atomic_store_64(&pCfg->collectionQuota, newCollectionQuota); atomic_store_64(&pCfg->jobQuota, newJobQuota);
} }
int64_t maxSize = 0; int64_t maxSize = 0;
@ -198,13 +258,28 @@ void qwCheckUpateCfgCb(void* pHandle, void* cfg) {
} }
} }
static bool qwJobMemSizeCompFn(void* l, void* r, void* param) {
SQWJobInfo* left = (SQWJobInfo*)l;
SQWJobInfo* right = (SQWJobInfo*)r;
if (atomic_load_8(&right->retired)) {
return true;
}
return atomic_load_64(&right->memInfo->allocMemSize) < atomic_load_64(&left->memInfo->allocMemSize);
}
int32_t qwInitQueryPool(void) { int32_t qwInitQueryPool(void) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
#ifdef LINUX
if (!tsQueryUseMemoryPool) { if (!tsQueryUseMemoryPool) {
qDebug("query memory pool disabled"); #endif
qInfo("query memory pool disabled");
return code; return code;
#ifdef LINUX
} }
#endif
SMemPoolCfg cfg = {0}; SMemPoolCfg cfg = {0};
int64_t maxSize = 0; int64_t maxSize = 0;
@ -216,12 +291,12 @@ int32_t qwInitQueryPool(void) {
cfg.threadNum = 10; //TODO cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.collectionQuota = tsSingleQueryMaxMemorySize * 1048576UL; cfg.jobQuota = tsSingleQueryMaxMemorySize * 1048576UL;
cfg.cb.setSessFp = qwSetConcurrentTaskNumCb; cfg.cb.setSessFp = qwSetConcurrentTaskNumCb;
cfg.cb.decSessFp = qwDecConcurrentTaskNumCb; cfg.cb.decSessFp = qwDecConcurrentTaskNumCb;
cfg.cb.incSessFp = qwIncConcurrentTaskNumCb; cfg.cb.incSessFp = qwIncConcurrentTaskNumCb;
cfg.cb.retiresFp = qwRetireCollectionsCb; cfg.cb.retireJobsFp = qwRetireJobsCb;
cfg.cb.retireFp = qwRetireCollectionCb; cfg.cb.retireJobFp = qwRetireJobCb;
cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb; cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb;
code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize); code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize);
@ -229,18 +304,24 @@ int32_t qwInitQueryPool(void) {
return code; return code;
} }
gQueryMgmt.pJobInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == gQueryMgmt.pJobInfo) {
qError("init job hash failed, error:0x%x", terrno);
return terrno;
}
gQueryMgmt.retireCtx.pJobQueue = createBoundedQueue(QW_MAX_RETIRE_JOB_NUM, qwJobMemSizeCompFn, NULL, NULL);
if (NULL == gQueryMgmt.retireCtx.pJobQueue) {
qError("init job bounded queue failed, error:0x%x", terrno);
return terrno;
}
code = taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryMgmt.memPoolHandle); code = taosMemPoolOpen(QW_QUERY_MEM_POOL_NAME, &cfg, &gQueryMgmt.memPoolHandle);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
gQueryMgmt.pQueryInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); qInfo("query memory pool initialized");
if (NULL == gQueryMgmt.pQueryInfo) {
qError("init query hash failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
qDebug("query memory pool initialized");
return code; return code;
} }

View File

@ -711,7 +711,11 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
} }
} }
void qwDestroyQueryInfo(SQWQueryInfo* pQuery) { void qwDestroyJobInfo(SQWJobInfo* pJob) {
//TODO
}
void qwRetireJob(SQWJobInfo* pJob) {
//TODO //TODO
} }

View File

@ -759,7 +759,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->phase = -1; ctx->phase = -1;
if (NULL != gQueryMgmt.memPoolHandle) { if (NULL != gQueryMgmt.memPoolHandle) {
QW_ERR_JRET(qwInitSession(QW_FPARAMS(), &ctx->memPoolSession)); QW_ERR_JRET(qwInitSession(QW_FPARAMS(), ctx, &ctx->memPoolSession));
} }
QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT));

View File

@ -193,26 +193,15 @@ typedef struct SMPStatInfo {
} SMPStatInfo; } SMPStatInfo;
typedef struct SMPCollection { typedef struct SMPJob {
SMemPoolCollection collection; // KEEP IT FIRST SMemPoolJob job; // KEEP IT FIRST
int32_t remainSession;
SMPStatInfo stat; SMPStatInfo stat;
} SMPCollection; } SMPJob;
typedef struct SMPSession {
SMPListNode list;
int64_t sessionId;
void* pCollection;
SMPCollection* pMpCollection;
bool needRetire;
SMPCtrlInfo ctrlInfo;
typedef struct SMPSessionChunk {
int64_t allocChunkNum; int64_t allocChunkNum;
int64_t allocChunkMemSize; int64_t allocChunkMemSize;
int64_t allocMemSize;
int64_t maxAllocMemSize;
int64_t reUseChunkNum; int64_t reUseChunkNum;
int32_t srcChunkNum; int32_t srcChunkNum;
@ -231,6 +220,18 @@ typedef struct SMPSession {
SMPNSChunk *reUseNSChunkHead; SMPNSChunk *reUseNSChunkHead;
SMPNSChunk *reUseNSChunkTail; SMPNSChunk *reUseNSChunkTail;
} SMPSessionChunk;
typedef struct SMPSession {
SMPListNode list;
int64_t sessionId;
SMPJob* pJob;
SMPCtrlInfo ctrlInfo;
int64_t allocMemSize;
int64_t maxAllocMemSize;
SMPSessionChunk chunk;
SMPStatInfo stat; SMPStatInfo stat;
} SMPSession; } SMPSession;
@ -240,32 +241,20 @@ typedef struct SMPCacheGroupInfo {
int64_t allocNum; int64_t allocNum;
int32_t groupNum; int32_t groupNum;
SMPCacheGroup *pGrpHead; SMPCacheGroup *pGrpHead;
SMPCacheGroup *pGrpTail;
void *pIdleList; void *pIdleList;
} SMPCacheGroupInfo; } SMPCacheGroupInfo;
typedef struct SMPChunkMgmt {
typedef struct SMemPool {
char *name;
int16_t slotId;
SMemPoolCfg cfg;
int64_t memRetireThreshold[3];
int64_t memRetireUnit;
int32_t maxChunkNum; int32_t maxChunkNum;
SMPCtrlInfo ctrlInfo;
int16_t maxDiscardSize; int16_t maxDiscardSize;
double threadChunkReserveNum; double threadChunkReserveNum;
int64_t allocChunkNum; int64_t allocChunkNum;
int64_t allocChunkSize; int64_t allocChunkSize;
int64_t allocNSChunkNum; int64_t allocNSChunkNum;
int64_t allocNSChunkSize; int64_t allocNSChunkSize;
int64_t allocMemSize;
int64_t maxAllocMemSize;
SMPCacheGroupInfo chunkCache; SMPCacheGroupInfo chunkCache;
SMPCacheGroupInfo NSChunkCache; SMPCacheGroupInfo NSChunkCache;
SMPCacheGroupInfo sessionCache;
int32_t readyChunkNum; int32_t readyChunkNum;
int32_t readyChunkReserveNum; int32_t readyChunkReserveNum;
@ -278,6 +267,23 @@ typedef struct SMemPool {
int64_t readyNSChunkNum; int64_t readyNSChunkNum;
SMPChunk *readyNSChunkHead; SMPChunk *readyNSChunkHead;
SMPChunk *readyNSChunkTail; SMPChunk *readyNSChunkTail;
} SMPChunkMgmt;
typedef struct SMemPool {
char *name;
int16_t slotId;
SMemPoolCfg cfg;
int64_t retireThreshold[3];
int64_t retireUnit;
SMPCtrlInfo ctrlInfo;
int64_t maxAllocMemSize;
int64_t allocMemSize;
SMPCacheGroupInfo sessionCache;
SMPChunkMgmt chunk;
SMPStatInfo stat; SMPStatInfo stat;
} SMemPool; } SMemPool;
@ -305,6 +311,24 @@ typedef struct SMemPoolMgmt {
int32_t code; int32_t code;
} SMemPoolMgmt; } SMemPoolMgmt;
typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t, uint32_t, void**);
typedef void (*mpFreeFunc)(SMemPool*, SMPSession*, void *, int64_t*);
typedef int64_t (*mpGetSizeFunc)(SMemPool*, SMPSession*, void*);
typedef int32_t (*mpReallocFunc)(SMemPool*, SMPSession*, void **, int64_t, int64_t*);
typedef int32_t (*mpInitSessionFunc)(SMemPool*, SMPSession*);
typedef int32_t (*mpInitFunc)(SMemPool*, char*, SMemPoolCfg*);
typedef int32_t (*mpUpdateCfgFunc)(SMemPool*);
typedef struct SMPStrategyFp {
mpInitFunc initFp;
mpAllocFunc allocFp;
mpFreeFunc freeFp;
mpGetSizeFunc getSizeFp;
mpReallocFunc reallocFp;
mpInitSessionFunc initSessionFp;
mpUpdateCfgFunc updateCfgFp;
} SMPStrategyFp;
#define MP_GET_FLAG(st, f) ((st) & (f)) #define MP_GET_FLAG(st, f) ((st) & (f))
#define MP_SET_FLAG(st, f) (st) |= (f) #define MP_SET_FLAG(st, f) (st) |= (f)
#define MP_CLR_FLAG(st, f) (st) &= (~f) #define MP_CLR_FLAG(st, f) (st) &= (~f)
@ -403,6 +427,28 @@ enum {
} \ } \
} while (0) } while (0)
// direct
int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes);
int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr);
void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize);
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize);
// chunk
int32_t mpChunkInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg);
int64_t mpChunkGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr);
int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes);
void mpChunkFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize);
int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize);
int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession);
int32_t mpChunkUpdateCfg(SMemPool* pPool);
int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes);
int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size);
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size);
int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup* pHead);
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes);
#ifdef __cplusplus #ifdef __cplusplus

317
source/util/src/mpChunk.c Executable file
View File

@ -0,0 +1,317 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "osMemPool.h"
#include "tmempoolInt.h"
#include "tlog.h"
#include "tutil.h"
int32_t mpChunkNew(SMemPool* pPool, SMPChunk** ppChunk) {
SMPChunk* pChunk = NULL;
MP_ERR_RET(mpPopIdleNode(pPool, &pPool->chunk.chunkCache, (void**)&pChunk));
pChunk->pMemStart = taosMemMalloc(pPool->cfg.chunkSize);
if (NULL == pChunk->pMemStart) {
uError("add new chunk, memory malloc %d failed, code: 0x%x", pPool->cfg.chunkSize, terrno);
return terrno;
}
pPool->chunk.allocChunkNum++;
pPool->chunk.allocChunkSize += pPool->cfg.chunkSize;
*ppChunk = pChunk;
return TSDB_CODE_SUCCESS;
}
int32_t mpChunkNewNS(SMemPool* pPool, SMPNSChunk** ppChunk, int64_t chunkSize) {
SMPNSChunk* pChunk = NULL;
MP_ERR_RET(mpPopIdleNode(pPool, &pPool->chunk.NSChunkCache, (void**)&pChunk));
pChunk->pMemStart = taosMemMalloc(chunkSize);
if (NULL == pChunk->pMemStart) {
uError("add new NS chunk, memory malloc %" PRId64 " failed, code: 0x%x", chunkSize, terrno);
return terrno;
}
pChunk->memBytes = chunkSize;
MP_SET_FLAG(pChunk->flags, MP_CHUNK_FLAG_NS_CHUNK);
pPool->chunk.allocNSChunkNum++;
pPool->chunk.allocNSChunkSize += pPool->cfg.chunkSize;
*ppChunk = pChunk;
return TSDB_CODE_SUCCESS;
}
int32_t mpChunkPrepare(SMemPool* pPool, int32_t num) {
SMPChunk* pChunk = NULL;
for (int32_t i = 0; i < num; ++i) {
MP_ERR_RET(mpChunkNew(pPool, &pChunk));
pPool->chunk.readyChunkTail->list.pNext = pChunk;
pPool->chunk.readyChunkTail = pChunk;
atomic_add_fetch_32(&pPool->chunk.readyChunkNum, 1);
}
return TSDB_CODE_SUCCESS;
}
int32_t mpChunkEnsureCapacity(SMemPool* pPool, SMPChunkMgmt* pChunk) {
if (E_EVICT_ALL == pPool->cfg.evicPolicy) {
return TSDB_CODE_SUCCESS;
}
int32_t readyMissNum = pChunk->readyChunkReserveNum - atomic_load_32(&pChunk->readyChunkNum);
if (readyMissNum <= 0) {
return TSDB_CODE_SUCCESS;
}
MP_ERR_RET(mpChunkPrepare(pPool, readyMissNum));
return TSDB_CODE_SUCCESS;
}
void mpChunkNotifyLowNum(SMemPool* pPool) {
}
int32_t mpChunkRetrieve(SMemPool* pPool, SMPChunk** ppChunk) {
SMPCacheGroup* pCache = NULL;
SMPChunk* pChunk = NULL;
int32_t readyChunkNum = atomic_sub_fetch_32(&pPool->chunk.readyChunkNum, 1);
if (readyChunkNum >= 0) {
if (atomic_add_fetch_32(&pPool->chunk.readyChunkGotNum, 1) == pPool->chunk.readyChunkLowNum) {
mpChunkNotifyLowNum(pPool);
}
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->chunk.readyChunkHead->list.pNext);
while (atomic_val_compare_exchange_ptr(&pPool->chunk.readyChunkHead->list.pNext, pChunk, pChunk->list.pNext) != pChunk) {
pChunk = (SMPChunk*)atomic_load_ptr(&pPool->chunk.readyChunkHead->list.pNext);
}
*ppChunk = pChunk;
return TSDB_CODE_SUCCESS;
} else {
atomic_add_fetch_32(&pPool->chunk.readyChunkNum, 1);
}
MP_RET(mpChunkNew(pPool, ppChunk));
}
int32_t mpChunkRetrieveFromSession(SMemPool* pPool, SMPSession* pSession, int64_t size, SMPChunk** ppChunk, SMPChunk** ppPreChunk) {
SMPChunk* pChunk = pSession->chunk.srcChunkHead;
while (NULL != pChunk) {
if ((pChunk->offset + size) <= pPool->cfg.chunkSize) {
*ppChunk = pChunk;
break;
}
*ppPreChunk = pChunk;
pChunk = (SMPChunk*)pChunk->list.pNext;
}
if (NULL == *ppChunk) {
*ppPreChunk = NULL;
}
return TSDB_CODE_SUCCESS;
}
int32_t mpChunkAllocMem(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SMPChunk* pChunk = NULL, *preSrcChunk = NULL;
void* pRes = NULL;
int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer);
if (pSession->chunk.srcChunkNum > 0) {
MP_ERR_JRET(mpChunkRetrieveFromSession(pPool, pSession, totalSize, &pChunk, &preSrcChunk));
}
if (NULL == pChunk) {
MP_ERR_JRET(mpChunkNew(pPool, &pChunk));
pSession->chunk.allocChunkNum++;
pSession->chunk.allocChunkMemSize += pPool->cfg.chunkSize;
mpUpdateAllocSize(pPool, pSession, totalSize);
MP_ADD_TO_CHUNK_LIST(pSession->chunk.srcChunkHead, pSession->chunk.srcChunkTail, pSession->chunk.srcChunkNum, pChunk);
MP_ADD_TO_CHUNK_LIST(pSession->chunk.inUseChunkHead, pSession->chunk.inUseChunkTail, pSession->chunk.inUseChunkNum, pChunk);
}
SMPMemHeader* pHeader = (SMPMemHeader*)(pChunk->pMemStart + pChunk->offset);
MP_INIT_MEM_HEADER(pHeader, size, false);
pRes = (void*)(pHeader + 1);
pChunk->offset += totalSize;
if (pChunk->offset >= (pPool->cfg.chunkSize - pPool->chunk.maxDiscardSize)) {
if (NULL == preSrcChunk) {
pSession->chunk.srcChunkHead = NULL;
pSession->chunk.srcChunkTail = NULL;
} else {
preSrcChunk->list.pNext = pChunk->list.pNext;
}
pSession->chunk.srcChunkNum--;
}
_return:
*ppRes = pRes;
return code;
}
int32_t mpChunkNSAllocMem(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SMPNSChunk* pChunk = NULL;
void* pRes = NULL;
int64_t totalSize = size + sizeof(SMPMemHeader) + sizeof(SMPMemTailer) + alignment;
MP_ERR_JRET(mpChunkNewNS(pPool, &pChunk, totalSize));
SMPMemHeader* pHeader = (SMPMemHeader*)pChunk->pMemStart;
MP_INIT_MEM_HEADER(pHeader, size, false);
pRes = (void*)(pHeader + 1);
pSession->chunk.allocChunkNum++;
pSession->chunk.allocChunkMemSize += totalSize;
mpUpdateAllocSize(pPool, pSession, totalSize);
if (NULL == pSession->chunk.inUseNSChunkHead) {
pSession->chunk.inUseNSChunkHead = pChunk;
pSession->chunk.inUseNSChunkTail = pChunk;
} else {
pSession->chunk.inUseNSChunkTail->list.pNext = pChunk;
}
_return:
*ppRes = pRes;
return code;
}
int32_t mpChunkInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
SMPChunkMgmt* pChunk = &pPool->chunk;
pChunk->threadChunkReserveNum = 1;
pChunk->chunkCache.nodeSize = sizeof(SMPChunk);
pChunk->NSChunkCache.groupNum = MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE;
pChunk->NSChunkCache.nodeSize = sizeof(SMPNSChunk);
MP_ERR_RET(mpAddCacheGroup(pPool, &pChunk->chunkCache, NULL));
MP_ERR_RET(mpAddCacheGroup(pPool, &pChunk->NSChunkCache, NULL));
MP_ERR_RET(mpPopIdleNode(pPool, &pChunk->chunkCache, (void**)&pChunk->readyChunkHead));
pChunk->readyChunkTail = pChunk->readyChunkHead;
MP_ERR_RET(mpChunkEnsureCapacity(pPool, pChunk));
return TSDB_CODE_SUCCESS;
}
int64_t mpChunkGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) {
SMPMemHeader* pHeader = (SMPMemHeader*)ptr - 1;
return pHeader->size;
}
int32_t mpChunkAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
MP_RET((size > pPool->cfg.chunkSize) ? mpChunkNSAllocMem(pPool, pSession, size, alignment, ppRes) : mpChunkAllocMem(pPool, pSession, size, alignment, ppRes));
}
void mpChunkFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) {
int64_t oSize = mpChunkGetMemSize(pPool, pSession, ptr);
if (origSize) {
*origSize = oSize;
}
// TODO
atomic_sub_fetch_64(&pSession->allocMemSize, oSize);
atomic_sub_fetch_64(&pPool->allocMemSize, oSize);
}
int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) {
int32_t code = TSDB_CODE_SUCCESS;
if (*origSize >= size) {
SMPMemHeader* pHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader));
pHeader->size = size;
return TSDB_CODE_SUCCESS;
}
void* res = NULL;
MP_ERR_JRET(mpMalloc(pPool, pSession, size, 0, &res));
SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader));
SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader));
TAOS_MEMCPY(res, *pPtr, *origSize);
mpChunkFree(pPool, pSession, *pPtr, NULL);
*pPtr = res;
return TSDB_CODE_SUCCESS;
_return:
mpChunkFree(pPool, pSession, *pPtr, NULL);
return code;
}
int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession) {
int32_t code = TSDB_CODE_SUCCESS;
SMPChunk* pChunk = NULL;
MP_ERR_RET(mpChunkRetrieve(pPool, &pChunk));
pSession->chunk.allocChunkNum = 1;
pSession->chunk.allocChunkMemSize = pPool->cfg.chunkSize;
MP_ADD_TO_CHUNK_LIST(pSession->chunk.srcChunkHead, pSession->chunk.srcChunkTail, pSession->chunk.srcChunkNum, pChunk);
MP_ADD_TO_CHUNK_LIST(pSession->chunk.inUseChunkHead, pSession->chunk.inUseChunkTail, pSession->chunk.inUseChunkNum, pChunk);
return code;
}
int32_t mpChunkUpdateCfg(SMemPool* pPool) {
pPool->chunk.maxChunkNum = pPool->cfg.maxSize / pPool->cfg.chunkSize;
if (pPool->chunk.maxChunkNum <= 0) {
uError("invalid memory pool max chunk num, maxSize:%" PRId64 ", chunkSize:%d", pPool->cfg.maxSize, pPool->cfg.chunkSize);
return TSDB_CODE_INVALID_MEM_POOL_PARAM;
}
pPool->chunk.readyChunkReserveNum = TMIN(pPool->cfg.threadNum * pPool->chunk.threadChunkReserveNum, pPool->chunk.maxChunkNum);
pPool->chunk.chunkCache.groupNum = TMAX(pPool->chunk.maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE);
return TSDB_CODE_SUCCESS;
}

74
source/util/src/mpDirect.c Executable file
View File

@ -0,0 +1,74 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "osMemPool.h"
#include "tmempoolInt.h"
#include "tlog.h"
#include "tutil.h"
int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
MP_ERR_RET(mpChkQuotaOverflow(pPool, pSession, size));
void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size);
if (NULL != res) {
mpUpdateAllocSize(pPool, pSession, size);
} else {
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, size);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, size);
uError("malloc %" PRId64 " alignment %d failed, code: 0x%x", size, alignment, terrno);
code = terrno;
}
*ppRes = res;
MP_RET(code);
}
int64_t mpDirectGetMemSize(SMemPool* pPool, SMPSession* pSession, void *ptr) {
return taosMemSize(ptr);
}
void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* origSize) {
int64_t oSize = taosMemSize(ptr);
if (origSize) {
*origSize = oSize;
}
taosMemFree(ptr);
(void)atomic_sub_fetch_64(&pSession->allocMemSize, oSize);
(void)atomic_sub_fetch_64(&pSession->pJob->job.allocMemSize, oSize);
(void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize);
}
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) {
int32_t code = TSDB_CODE_SUCCESS;
MP_ERR_RET(mpChkQuotaOverflow(pPool, pSession, size));
*pPtr = taosMemRealloc(*pPtr, size);
if (NULL != *pPtr) {
mpUpdateAllocSize(pPool, pSession, size - *origSize);
} else {
MP_ERR_RET(terrno);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -343,6 +343,13 @@ BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete delete
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) { taosPQSetFn(q->queue, fn); } void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) { taosPQSetFn(q->queue, fn); }
void taosBQClear(BoundedQueue* q) {
if (q->queue->deleteFn)
taosArrayClearEx(q->queue->container, q->queue->deleteFn);
else
taosArrayClear(q->queue->container);
}
void destroyBoundedQueue(BoundedQueue* q) { void destroyBoundedQueue(BoundedQueue* q) {
if (!q) return; if (!q) return;
destroyPriorityQueue(q->queue); destroyPriorityQueue(q->queue);
@ -357,11 +364,9 @@ PriorityQueueNode* taosBQPush(BoundedQueue* q, PriorityQueueNode* n) {
} else { } else {
void* p = top->data; void* p = top->data;
top->data = n->data; top->data = n->data;
n->data = p;
if (q->queue->deleteFn) { if (q->queue->deleteFn) {
n->data = p;
q->queue->deleteFn(n->data); q->queue->deleteFn(n->data);
} else {
taosMemoryFree(n->data);
} }
} }
return pqHeapify(q->queue, 0, taosBQSize(q)); return pqHeapify(q->queue, 0, taosBQSize(q));

File diff suppressed because it is too large Load Diff