fix: restore direct alloc

This commit is contained in:
dapan1121 2024-11-22 14:34:37 +08:00
parent 9ffe0bcd8c
commit 3dc61bf0f4
5 changed files with 109 additions and 31 deletions

View File

@ -75,6 +75,7 @@ extern int32_t tsQueryMaxConcurrentTaskNum;
extern int32_t tsQueryConcurrentTaskNum;
extern int32_t tsSingleQueryMaxMemorySize;
extern int8_t tsQueryUseMemoryPool;
extern int8_t tsMemPoolDebug;
//extern int32_t tsQueryBufferPoolSize;
extern int32_t tsMinReservedMemorySize;
extern int64_t tsCurrentAvailMemorySize;

View File

@ -152,24 +152,25 @@ int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp);
extern void* gMemPoolHandle;
extern threadlocal void* threadPoolSession;
extern threadlocal bool threadPoolEnabled;
extern int8_t tsMemPoolDebug;
#define taosEnableFullMemPoolUsage(_session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0)
#define taosDisableFullMemPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0)
#define taosEnableMemPoolUsage() do { threadPoolEnabled = true; tsEnableRandErr = true;} while (0)
#define taosDisableMemPoolUsage() do { threadPoolEnabled = false; tsEnableRandErr = false;} while (0)
#define taosSaveDisableMemPoolUsage(_enable, _randErr) do { (_enable) = threadPoolEnabled; (_randErr) = tsEnableRandErr; threadPoolEnabled = false; tsEnableRandErr = false;} while (0)
#define taosRestoreEnableMemPoolUsage(_enable, _randErr) do { threadPoolEnabled = (_enable); tsEnableRandErr = (_randErr);} while (0)
#define taosMemoryMalloc(_size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryCalloc(_num, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define taosMemoryRealloc(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define taosStrdup(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define taosStrndup(_ptr, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size)))
#define taosMemoryFree(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define taosMemorySize(_ptr) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define taosMemoryTrim(_size, _trimed) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed)))
#define taosMemoryMallocAlign(_alignment, _size) ((threadPoolEnabled && gMemPoolHandle) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
#define taosMemoryMalloc(_size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolMalloc(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size)))
#define taosMemoryCalloc(_num, _size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolCalloc(gMemPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
#define taosMemoryRealloc(_ptr, _size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolRealloc(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
#define taosStrdup(_ptr) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolStrdup(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr)))
#define taosStrndup(_ptr, _size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolStrndup(gMemPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size)))
#define taosMemoryFree(_ptr) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolFree(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr)))
#define taosMemorySize(_ptr) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolGetMemorySize(gMemPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr)))
#define taosMemoryTrim(_size, _trimed) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolTrim(gMemPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size, _trimed)))
#define taosMemoryMallocAlign(_alignment, _size) ((threadPoolEnabled && threadPoolSession) ? (taosMemPoolMallocAlign(gMemPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
#else
#define taosEnableMemoryPoolUsage(_pool, _session)
#define taosDisableMemoryPoolUsage()

View File

@ -54,6 +54,7 @@ int32_t tsMaxShellConns = 50000;
int32_t tsShellActivityTimer = 3; // second
// memory pool
int8_t tsMemPoolDebug = 0;
int8_t tsQueryUseMemoryPool = 1;
int32_t tsQueryBufferPoolSize = 0; //MB
int32_t tsSingleQueryMaxMemorySize = 0; //MB

View File

@ -278,8 +278,8 @@ void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int6
mpUpdateMaxAllocSize(&pSession->pJob->job.maxAllocMemSize, allocMemSize);
}
int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize);
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOG_MAXSIZE)) {
int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize);
mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize);
}
}
@ -902,6 +902,8 @@ _return:
void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPStatInput* pInput) {
bool enablePool = false, randErr = false;
switch (item) {
case E_MP_STAT_LOG_MEM_MALLOC:
case E_MP_STAT_LOG_MEM_CALLOC:
@ -917,14 +919,14 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt
mpLogDetailStat(&pPool->stat.statDetail, item, pInput);
}
if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
taosDisableMemPoolUsage();
taosSaveDisableMemPoolUsage(enablePool, randErr);
mpLogPosStat(&pSession->stat.posStat, item, pInput, true);
taosEnableMemPoolUsage();
taosRestoreEnableMemPoolUsage(enablePool, randErr);
}
if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
taosDisableMemPoolUsage();
taosSaveDisableMemPoolUsage(enablePool, randErr);
mpLogPosStat(&pPool->stat.posStat, item, pInput, false);
taosEnableMemPoolUsage();
taosRestoreEnableMemPoolUsage(enablePool, randErr);
}
break;
}
@ -1250,6 +1252,10 @@ _return:
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) {
return taosMemMalloc(size);
}
int32_t code = TSDB_CODE_SUCCESS;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
@ -1276,6 +1282,10 @@ _return:
}
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) {
return taosMemCalloc(num, size);
}
int32_t code = TSDB_CODE_SUCCESS;
int64_t totalSize = num * size;
SMPStatInput input = {.size = totalSize, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
@ -1304,6 +1314,10 @@ _return:
}
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) {
return taosMemRealloc(ptr, size);
}
int32_t code = TSDB_CODE_SUCCESS;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .origSize = 0, .pMem = ptr, .pOrigMem = ptr};
@ -1351,6 +1365,10 @@ _return:
}
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) {
return taosStrdupi(ptr);
}
int32_t code = TSDB_CODE_SUCCESS;
int64_t size = (ptr ? strlen(ptr) : 0) + 1;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
@ -1383,6 +1401,10 @@ _return:
}
char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) {
return taosStrndupi(ptr, size);
}
int32_t code = TSDB_CODE_SUCCESS;
int64_t origSize = ptr ? strlen(ptr) : 0;
size = TMIN(size, origSize) + 1;
@ -1417,6 +1439,11 @@ _return:
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) {
taosMemFree(ptr);
return;
}
if (NULL == ptr) {
return;
}
@ -1438,6 +1465,10 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName,
}
int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) {
return taosMemSize(ptr);;
}
int64_t code = 0;
if (NULL == poolHandle || NULL == fileName) {
uError("%s invalid input param, handle:%p, session:%p, fileName:%p",
@ -1456,6 +1487,10 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha
}
void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) {
return taosMemMallocAlign(alignment, size);
}
int32_t code = TSDB_CODE_SUCCESS;
SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC, .pMem = NULL};
@ -1511,6 +1546,10 @@ void taosMemPoolModDestroy(void) {
int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) {
if (0 == tsMemPoolDebug) {
return taosMemTrim(size, trimed);
}
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == poolHandle || NULL == fileName || size < 0) {

View File

@ -188,6 +188,7 @@ typedef struct {
typedef struct {
int32_t jobQuota;
bool enableMemPool;
bool reserveMode;
int64_t upperLimitSize;
int32_t reserveSize; //MB
@ -474,7 +475,10 @@ void mptDestroyTaskCtx(SMPTestJobCtx* pJobCtx, int32_t taskIdx) {
SMPTestTaskCtx* pTask = &pJobCtx->taskCtxs[taskIdx];
mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[taskIdx]);
if (mptCtx.param.enableMemPool) {
mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[taskIdx]);
}
for (int32_t i = 0; i < pTask->memIdx; ++i) {
pTask->stat.times.memFree.exec++;
pTask->stat.bytes.memFree.exec+=mptMemorySize(pTask->pMemList[i].p);
@ -482,8 +486,11 @@ void mptDestroyTaskCtx(SMPTestJobCtx* pJobCtx, int32_t taskIdx) {
mptMemoryFree(pTask->pMemList[i].p);
pTask->pMemList[i].p = NULL;
}
mptDisableMemoryPoolUsage();
if (mptCtx.param.enableMemPool) {
mptDisableMemoryPoolUsage();
}
mptDestroySession(pJobCtx->jobId, pJobCtx->taskCtxs[taskIdx].taskId, 0, taskIdx, pJobCtx, pJobCtx->pSessions[taskIdx]);
pJobCtx->pSessions[taskIdx] = NULL;
@ -614,13 +621,15 @@ void mptInitJob(int32_t idx) {
void mptDestroyTask(SMPTestJobCtx* pJobCtx, int32_t taskIdx) {
SMPStatDetail* pStat = NULL;
int64_t allocSize = 0;
taosMemPoolGetSessionStat(pJobCtx->pSessions[taskIdx], &pStat, &allocSize, NULL);
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
assert(allocSize == usedSize);
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[taskIdx].stat, sizeof(*pStat)));
if (mptCtx.param.enableMemPool) {
SMPStatDetail* pStat = NULL;
int64_t allocSize = 0;
taosMemPoolGetSessionStat(pJobCtx->pSessions[taskIdx], &pStat, &allocSize, NULL);
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
assert(allocSize == usedSize);
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[taskIdx].stat, sizeof(*pStat)));
}
mptDestroyTaskCtx(pJobCtx, taskIdx);
}
@ -1147,10 +1156,16 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx, int32
}
atomic_add_fetch_32(&pJobCtx->taskRunningNum, 1);
if (mptCtx.param.enableMemPool) {
mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[idx]);
}
mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[idx]);
mptSimulateTask(pJobCtx, pCtx, actTimes);
mptDisableMemoryPoolUsage();
if (mptCtx.param.enableMemPool) {
mptDisableMemoryPoolUsage();
}
// if (!atomic_load_8(&pJobCtx->pJob->retired)) {
mptSimulateOutTask(pJobCtx, pCtx);
@ -1345,7 +1360,7 @@ void* mptRunThreadFunc(void* param) {
MPT_PRINTF("Thread %d finish the %dth exection\n", pThread->idx, n);
if (mptCtx.param.threadNum <= 1) {
if (mptCtx.param.threadNum <= 1 && mptCtx.param.enableMemPool) {
mptCheckPoolUsedSize(mptJobNum);
}
}
@ -1576,14 +1591,15 @@ TEST(FuncTest, SingleThreadTest) {
}
#endif
#if 1
TEST(FuncTest, MultiThreadTest) {
#if 0
TEST(EnablePoolFuncTest, MultiThreadTest) {
char* caseName = "FuncTest:MultiThreadTest";
SMPTestParam param = {0};
param.reserveMode = true;
param.threadNum = 6;
param.jobQuota = 1024;
param.randTask = true;
param.enableMemPool = true;
mptPrintTestBeginInfo(caseName, &param);
@ -1594,6 +1610,26 @@ TEST(FuncTest, MultiThreadTest) {
}
#endif
#if 1
TEST(DisablePoolFuncTest, MultiThreadTest) {
char* caseName = "FuncTest:MultiThreadTest";
SMPTestParam param = {0};
param.reserveMode = true;
param.threadNum = 6;
param.jobQuota = 1024;
param.randTask = true;
param.enableMemPool = false;
mptPrintTestBeginInfo(caseName, &param);
for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) {
mptRunCase(&param, i);
}
}
#endif
#endif