fix: return value issue

This commit is contained in:
dapan1121 2024-07-11 19:28:13 +08:00
parent 0b7f0e7334
commit 761b0a9437
5 changed files with 107 additions and 87 deletions

View File

@ -42,8 +42,8 @@ typedef enum MemPoolUsageLevel {
typedef void (*mpDecConcSessionNum)(void);
typedef void (*mpIncConcSessionNum)(void);
typedef void (*mpSetConcSessionNum)(int32_t);
typedef bool (*mpRetireCollection)(uint64_t, int64_t, bool);
typedef void (*mpCfgUpdate)(void);
typedef bool (*mpRetireCollection)(int64_t, int64_t, int64_t, bool);
typedef void (*mpCfgUpdate)(void*, SMemPoolCfg*);
typedef struct SMemPoolCallBack {
mpDecConcSessionNum decSessFp;

View File

@ -46,7 +46,6 @@ extern "C" {
#define QW_MIN_MEM_POOL_SIZE (1048576UL)
#define QW_DEFAULT_THREAD_TASK_NUM 3
#define QW_DEFAULT_MIN_MEM_POOL_SIZE 104857600UL
enum {
QW_CONC_TASK_LEVEL_LOW = 1,

View File

@ -20,7 +20,7 @@ int32_t qwGetMemPoolChunkSize(int64_t totalSize, int32_t threadNum, int32_t* chu
return TSDB_CODE_SUCCESS;
}
void qwSetConcurrentTaskNum(int32_t taskNum) {
void qwSetConcurrentTaskNumCb(int32_t taskNum) {
int32_t finTaskNum = TMIN(taskNum, tsNumOfQueryThreads * QW_DEFAULT_THREAD_TASK_NUM);
if (tsQueryMaxConcurrentTaskNum > 0) {
@ -33,7 +33,7 @@ void qwSetConcurrentTaskNum(int32_t taskNum) {
atomic_store_32(&gQueryMgmt.concTaskLevel, QW_CONC_TASK_LEVEL_FULL);
}
void qwDecConcurrentTaskNum(void) {
void qwDecConcurrentTaskNumCb(void) {
int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel);
if (concTaskLevel <= QW_CONC_TASK_LEVEL_LOW) {
qError("Unable to decrease concurrent task num, current task level:%d", concTaskLevel);
@ -43,7 +43,7 @@ void qwDecConcurrentTaskNum(void) {
//TODO
}
void qwIncConcurrentTaskNum(void) {
void qwIncConcurrentTaskNumCb(void) {
int32_t concTaskLevel = atomic_load_32(&gQueryMgmt.concTaskLevel);
if (concTaskLevel >= QW_CONC_TASK_LEVEL_FULL) {
qError("Unable to increase concurrent task num, current task level:%d", concTaskLevel);
@ -117,8 +117,8 @@ bool qwLowLevelRetire() {
}
bool qwRetireCollection(uint64_t collectionId, int64_t retireSize, bool retireLow) {
if (retireLow) {
bool qwRetireCollectionCb(int64_t collectionId, int64_t collectionAllocSize, int64_t retireSize, bool retireLow) {
if (retireLow && collectionAllocSize > retireSize) {
return qwLowLevelRetire();
}
@ -150,7 +150,7 @@ int32_t qwGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
return code;
}
void qwCheckUpateCfg(SMemPoolCfg* pCfg) {
void qwCheckUpateCfgCb(void* pHandle, SMemPoolCfg* pCfg) {
int64_t newCollectionQuota = tsSingleQueryMaxMemorySize * 1048576UL;
if (pCfg->collectionQuota != newCollectionQuota) {
atomic_store_64(&pCfg->collectionQuota, newCollectionQuota);
@ -160,7 +160,7 @@ void qwCheckUpateCfg(SMemPoolCfg* pCfg) {
bool autoMaxSize = false;
int32_t code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize);
if (TSDB_CODE_SUCCESS != code) {
pCfg->maxSize = QW_DEFAULT_MIN_MEM_POOL_SIZE;
pCfg->maxSize = 0;
qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize);
return;
}
@ -168,7 +168,7 @@ void qwCheckUpateCfg(SMemPoolCfg* pCfg) {
if (pCfg->autoMaxSize != autoMaxSize || pCfg->maxSize != maxSize) {
pCfg->autoMaxSize = autoMaxSize;
atomic_store_64(&pCfg->maxSize, maxSize);
taosMemPoolCfgUpdate();
taosMemPoolCfgUpdate(pHandle, pCfg);
}
}
@ -191,11 +191,11 @@ int32_t qwInitQueryPool(void) {
cfg.threadNum = 10; //TODO
cfg.evicPolicy = E_EVICT_AUTO; //TODO
cfg.collectionQuota = tsSingleQueryMaxMemorySize * 1048576UL;
cfg.cb.setSessFp = qwSetConcurrentTaskNum;
cfg.cb.decSessFp = qwDecConcurrentTaskNum;
cfg.cb.incSessFp = qwIncConcurrentTaskNum;
cfg.cb.retireFp = qwRetireCollection;
cfg.cb.cfgUpdateFp = qwCheckUpateCfg;
cfg.cb.setSessFp = qwSetConcurrentTaskNumCb;
cfg.cb.decSessFp = qwDecConcurrentTaskNumCb;
cfg.cb.incSessFp = qwIncConcurrentTaskNumCb;
cfg.cb.retireFp = qwRetireCollectionCb;
cfg.cb.cfgUpdateFp = qwCheckUpateCfgCb;
code = qwGetMemPoolChunkSize(cfg.maxSize, cfg.threadNum, &cfg.chunkSize);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -37,6 +37,7 @@ extern "C" {
#define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9)
#define MP_RETIRE_LOW_THRESHOLD_PERCENT (0.85)
#define MP_RETIRE_UNIT_PERCENT (0.1)
#define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL)
// FLAGS AREA

View File

@ -188,10 +188,11 @@ int32_t memPoolEnsureChunks(SMemPool* pPool) {
}
int32_t memPoolApplyCfgUpdate(SMemPool* pPool) {
pPool->memRetireThreshold[0] = pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT;
pPool->memRetireThreshold[1] = pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT;
pPool->memRetireThreshold[2] = pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT;
pPool->memRetireUnit = pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT;
atomic_store_64(&pPool->memRetireThreshold[0], pPool->cfg.maxSize * MP_RETIRE_LOW_THRESHOLD_PERCENT);
atomic_store_64(&pPool->memRetireThreshold[1], pPool->cfg.maxSize * MP_RETIRE_MID_THRESHOLD_PERCENT);
atomic_store_64(&pPool->memRetireThreshold[2], pPool->cfg.maxSize * MP_RETIRE_HIGH_THRESHOLD_PERCENT);
atomic_store_64(&pPool->memRetireUnit, TMAX(pPool->cfg.maxSize * MP_RETIRE_UNIT_PERCENT, MP_RETIRE_UNIT_MIN_SIZE));
if (E_MP_STRATEGY_CHUNK == gMPMgmt.strategy) {
pPool->maxChunkNum = pPool->cfg.maxSize / pPool->cfg.chunkSize;
@ -204,7 +205,7 @@ int32_t memPoolApplyCfgUpdate(SMemPool* pPool) {
pPool->chunkCache.groupNum = TMAX(pPool->maxChunkNum / 10, MP_CHUNK_CACHE_ALLOC_BATCH_SIZE);
}
return TSDB_CODE_SUCCESS;
}
@ -391,54 +392,66 @@ _return:
return pRes;
}
int32_t memPoolPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
bool memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) {
int64_t allocSize = atomic_add_fetch_64(&pSession->pMpCollection->allocMemSize, size);
}
int32_t memPoolMemQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) {
SMPCollection* pCollection = pSession->pMpCollection;
int64_t cAllocSize = atomic_add_fetch_64(&pCollection->allocMemSize, size);
int64_t quota = atomic_load_64(&pPool->cfg.collectionQuota);
if (quota > 0 && allocSize > quota) {
uWarn("collection %" PRIx64 " allocSize " PRId64 " is over than quota %" PRId64, pSession->pMpCollection->collectionId, allocSize, quota);
atomic_sub_fetch_64(&pSession->pMpCollection->allocMemSize, size);
return true;
if (quota > 0 && cAllocSize > quota) {
uWarn("collection %" PRIx64 " allocSize " PRId64 " is over than quota %" PRId64, pCollection->collectionId, cAllocSize, quota);
atomic_sub_fetch_64(&pCollection->allocMemSize, size);
MP_RET(TSDB_CODE_QRY_REACH_QMEM_THRESHOLD);
}
allocSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
int64_t pAllocSize = atomic_add_fetch_64(&pPool->allocMemSize, size);
quota = atomic_load_64(&pPool->memRetireThreshold[2]);
if (allocSize >= quota) {
uWarn("pool allocSize " PRId64 " reaches the high quota %" PRId64, allocSize, quota);
if (pAllocSize >= quota) {
uWarn("pool allocSize " PRId64 " reaches the high quota %" PRId64, pAllocSize, quota);
atomic_sub_fetch_64(&pCollection->allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size);
return true;
MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
}
quota = atomic_load_64(&pPool->memRetireThreshold[1]);
if (allocSize >= quota) {
uInfo("pool allocSize " PRId64 " reaches the middle quota %" PRId64, allocSize, quota);
bool retire = (*pPool->cfg.cb.retireFp)(pSession->pMpCollection->collectionId, pPool->memRetireUnit, false);
if (retire) {
uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize));
if (pAllocSize >= quota) {
uInfo("pool allocSize " PRId64 " reaches the middle quota %" PRId64, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) {
uWarn("session retired in middle quota retire choise, sessionAllocSize: %" PRId64 ", collectionSize: %" PRId64,
atomic_load_64(&pSession->allocMemSize), cAllocSize);
atomic_sub_fetch_64(&pCollection->allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, false));
MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
}
return retire;
return TSDB_CODE_SUCCESS;
}
quota = atomic_load_64(&pPool->memRetireThreshold[0]);
if (allocSize >= quota) {
uInfo("pool allocSize " PRId64 " reaches the low quota %" PRId64, allocSize, quota);
bool retire = (*pPool->cfg.cb.retireFp)(pSession->pMpCollection->collectionId, pPool->memRetireUnit, true);
if (retire) {
if (pAllocSize >= quota) {
uInfo("pool allocSize " PRId64 " reaches the low quota %" PRId64, pAllocSize, quota);
if (cAllocSize >= atomic_load_64(&pPool->memRetireUnit)) {
uWarn("session retired in low quota retire choise, sessionAllocSize: %" PRId64, atomic_load_64(&pSession->allocMemSize));
atomic_sub_fetch_64(&pCollection->allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size);
MP_ERR_RET(memPoolPutRetireMsgToQueue(pPool, true));
MP_RET(TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
}
return retire;
}
return false;
return TSDB_CODE_SUCCESS;
}
void* memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) {
if (memPoolMemQuotaOverflow(pPool, pSession, size)) {
uInfo("session needs to retire memory");
return NULL;
}
int32_t memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
MP_ERR_RET(memPoolMemQuotaOverflow(pPool, pSession, size));
void* res = alignment ? taosMemMallocAlign(alignment, size) : taosMemMalloc(size);
if (NULL != res) {
@ -447,9 +460,11 @@ void* memPoolAllocDirect(SMemPool* pPool, SMPSession* pSession, int64_t size, ui
atomic_sub_fetch_64(&pSession->pMpCollection->allocMemSize, size);
atomic_sub_fetch_64(&pPool->allocMemSize, size);
uError("malloc %" PRId64 " alighment %d failed", size, alignment);
code = TAOS_SYSTEM_ERROR(errno);
}
return res;
MP_RET(code);
}
int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr) {
@ -467,24 +482,21 @@ int64_t memPoolGetMemorySizeImpl(SMemPool* pPool, SMPSession* pSession, void *pt
return 0;
}
void *memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment) {
int32_t memPoolMallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t size, uint32_t alignment, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
switch (gMPMgmt.strategy) {
case E_MP_STRATEGY_DIRECT:
res = memPoolAllocDirect(pPool, pSession, size, alignment);
MP_ERR_RET(memPoolAllocDirect(pPool, pSession, size, alignment, ppRes));
break;
case E_MP_STRATEGY_CHUNK:
res = (size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment) : memPoolAllocFromChunk(pPool, pSession, size, alignment);
MP_ERR_RET((size > pPool->cfg.chunkSize) ? memPoolAllocFromNSChunk(pPool, pSession, size, alignment, ppRes) : memPoolAllocFromChunk(pPool, pSession, size, alignment, ppRes));
break;
default:
break;
}
_return:
return res;
return code;
}
void *memPoolCallocImpl(SMemPool* pPool, SMPSession* pSession, int64_t num, int64_t size) {
@ -535,56 +547,60 @@ void memPoolFreeImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t*
return;
}
void *memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t size, int64_t* origSize) {
void *res = NULL;
int32_t memPoolReallocImpl(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t size, int64_t* origSize) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == ptr) {
if (NULL == *pPtr) {
*origSize = 0;
res = memPoolMallocImpl(pPool, pSession, size, 0);
return res;
MP_RET(memPoolMallocImpl(pPool, pSession, size, 0));
}
if (0 == size) {
memPoolFreeImpl(pPool, pSession, ptr, origSize);
return res;
memPoolFreeImpl(pPool, pSession, *pPtr, origSize);
return TSDB_CODE_SUCCESS;
}
*origSize = memPoolGetMemorySizeImpl(pPool, pSession, ptr);
*origSize = memPoolGetMemorySizeImpl(pPool, pSession, *pPtr);
switch (gMPMgmt.strategy) {
case E_MP_STRATEGY_DIRECT: {
if (memPoolMemQuotaOverflow(pPool, pSession, size)) {
uInfo("session needs to retire memory");
return NULL;
}
MP_ERR_JRET(memPoolMemQuotaOverflow(pPool, pSession, size));
res = taosMemRealloc(ptr, size);
if (NULL != res) {
*pPtr = taosMemRealloc(*pPtr, size);
if (NULL != *pPtr) {
memPoolUpdateAllocMemSize(pPool, pSession, size - *origSize);
} else {
MP_ERR_JRET(TAOS_SYSTEM_ERROR(errno));
}
return res;
break;
}
case E_MP_STRATEGY_CHUNK: {
if (*origSize >= size) {
SMPMemHeader* pHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
SMPMemHeader* pHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader));
pHeader->size = size;
return ptr;
return TSDB_CODE_SUCCESS;
}
res = memPoolMallocImpl(pPool, pSession, size, 0);
SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)ptr - sizeof(SMPMemHeader));
void* res = NULL;
MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, 0, &res));
SMPMemHeader* pOrigHeader = (SMPMemHeader*)((char*)*pPtr - sizeof(SMPMemHeader));
SMPMemHeader* pNewHeader = (SMPMemHeader*)((char*)res - sizeof(SMPMemHeader));
memcpy(res, ptr, *origSize);
memPoolFreeImpl(pPool, pSession, ptr, NULL);
memcpy(res, *pPtr, *origSize);
memPoolFreeImpl(pPool, pSession, *pPtr, NULL);
*pPtr = res;
return res;
return TSDB_CODE_SUCCESS;
}
default:
break;
}
_return:
memPoolFreeImpl(pPool, pSession, *pPtr, NULL);
return NULL;
return code;
}
void memPoolPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName, int64_t maxAllocSize) {
@ -799,12 +815,14 @@ void* memPoolMgmtThreadFunc(void* param) {
while (0 == atomic_load_8(&gMPMgmt.modExit)) {
tsem2_timewait(&gMPMgmt.threadSem, gMPMgmt.waitMs);
(*pPool->cfg.cb.retireFp)(pCollection->collectionId, cAllocSize, atomic_load_64(&pPool->memRetireUnit), true);
taosRLockLatch(&gMPMgmt.poolLock);
int32_t poolNum = taosArrayGetSize(gMPMgmt.poolList);
for (int32_t i = 0; i < poolNum; ++i) {
SMemPool* pPool = (SMemPool*)taosArrayGetP(gMPMgmt.poolList, i);
if (pPool->cfg.cb.cfgUpdateFp) {
(*pPool->cfg.cb.cfgUpdateFp)(&pPool->cfg);
(*pPool->cfg.cb.cfgUpdateFp)((void*)pPool, &pPool->cfg);
}
}
taosRUnLockLatch(&gMPMgmt.poolLock);
@ -882,8 +900,10 @@ _return:
return code;
}
void taosMemPoolCfgUpdate() {
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) {
SMemPool* pPool = (SMemPool*)poolHandle;
MP_ERR_RET(memPoolApplyCfgUpdate(pPool));
}
void taosMemPoolDestroySession(void* poolHandle, void* session) {
@ -932,7 +952,7 @@ _return:
}
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
@ -945,7 +965,7 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* f
SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
res = memPoolMallocImpl(pPool, pSession, size, 0);
MP_ERR_JRET(memPoolMallocImpl(pPool, pSession, size, 0, &res));
MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_MALLOC, &input);
@ -984,7 +1004,7 @@ _return:
return res;
}
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
int32_t code = TSDB_CODE_SUCCESS;
void *res = NULL;
@ -998,14 +1018,14 @@ void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t s
SMPSession* pSession = (SMPSession*)session;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC};
res = memPoolReallocImpl(pPool, pSession, ptr, size, &input.origSize);
MP_ERR_JRET(memPoolReallocImpl(pPool, pSession, &ptr, size, &input.origSize));
MP_SET_FLAG(input.procFlags, ((res || 0 == size) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL));
memPoolLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_REALLOC, &input);
_return:
return res;
return ptr;
}
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {