enh: add memory full func mode

This commit is contained in:
dapan1121 2024-11-25 15:31:29 +08:00
parent 3320f111fd
commit 8455949575
8 changed files with 74 additions and 28 deletions

View File

@ -75,7 +75,7 @@ extern int32_t tsQueryMaxConcurrentTaskNum;
extern int32_t tsQueryConcurrentTaskNum; extern int32_t tsQueryConcurrentTaskNum;
extern int32_t tsSingleQueryMaxMemorySize; extern int32_t tsSingleQueryMaxMemorySize;
extern int8_t tsQueryUseMemoryPool; extern int8_t tsQueryUseMemoryPool;
extern int8_t tsMemPoolDebug; extern int8_t tsMemPoolFullFunc;
//extern int32_t tsQueryBufferPoolSize; //extern int32_t tsQueryBufferPoolSize;
extern int32_t tsMinReservedMemorySize; extern int32_t tsMinReservedMemorySize;
extern int64_t tsCurrentAvailMemorySize; extern int64_t tsCurrentAvailMemorySize;

View File

@ -152,11 +152,11 @@ int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp);
extern void* gMemPoolHandle; extern void* gMemPoolHandle;
extern threadlocal void* threadPoolSession; extern threadlocal void* threadPoolSession;
extern threadlocal bool threadPoolEnabled; extern threadlocal bool threadPoolEnabled;
extern int8_t tsMemPoolDebug; extern int8_t tsMemPoolFullFunc;
#define taosEnableFullMemPoolUsage(_session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0) #define taosEnableMemPoolUsage(_session) do { threadPoolSession = _session; tsEnableRandErr = true;} while (0)
#define taosDisableFullMemPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0) #define taosDisableMemPoolUsage() do { threadPoolSession = NULL; tsEnableRandErr = false;} while (0)
#define taosSaveDisableMemPoolUsage(_enable, _randErr) do { (_enable) = threadPoolEnabled; (_randErr) = tsEnableRandErr; threadPoolEnabled = false; tsEnableRandErr = false;} while (0) #define taosSaveDisableMemPoolUsage(_enable, _randErr) do { (_enable) = threadPoolEnabled; (_randErr) = tsEnableRandErr; threadPoolEnabled = false; tsEnableRandErr = false;} while (0)
#define taosRestoreEnableMemPoolUsage(_enable, _randErr) do { threadPoolEnabled = (_enable); tsEnableRandErr = (_randErr);} while (0) #define taosRestoreEnableMemPoolUsage(_enable, _randErr) do { threadPoolEnabled = (_enable); tsEnableRandErr = (_randErr);} while (0)

View File

@ -54,7 +54,7 @@ int32_t tsMaxShellConns = 50000;
int32_t tsShellActivityTimer = 3; // second int32_t tsShellActivityTimer = 3; // second
// memory pool // memory pool
int8_t tsMemPoolDebug = 0; int8_t tsMemPoolFullFunc = 0;
int8_t tsQueryUseMemoryPool = 1; int8_t tsQueryUseMemoryPool = 1;
int32_t tsQueryBufferPoolSize = 0; //MB int32_t tsQueryBufferPoolSize = 0; //MB
int32_t tsSingleQueryMaxMemorySize = 0; //MB int32_t tsSingleQueryMaxMemorySize = 0; //MB

View File

@ -274,11 +274,11 @@ extern SQueryMgmt gQueryMgmt;
#define QW_SINK_ENABLE_MEMPOOL(_ctx) \ #define QW_SINK_ENABLE_MEMPOOL(_ctx) \
do { \ do { \
if ((_ctx)->sinkWithMemPool) { \ if ((_ctx)->sinkWithMemPool) { \
taosEnableFullMemPoolUsage((_ctx)->memPoolSession); \ taosEnableMemPoolUsage((_ctx)->memPoolSession); \
} \ } \
} while (0) } while (0)
#define QW_SINK_DISABLE_MEMPOOL() taosDisableFullMemPoolUsage() #define QW_SINK_DISABLE_MEMPOOL() taosDisableMemPoolUsage()
#define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n) #define QW_STAT_INC(_item, _n) (void)atomic_add_fetch_64(&(_item), _n)
#define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n) #define QW_STAT_DEC(_item, _n) (void)atomic_sub_fetch_64(&(_item), _n)

View File

@ -276,9 +276,9 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) {
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle);
if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) { if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) {
taosEnableFullMemPoolUsage(ctx->memPoolSession); taosEnableMemPoolUsage(ctx->memPoolSession);
qDestroyTask(otaskHandle); qDestroyTask(otaskHandle);
taosDisableFullMemPoolUsage(); taosDisableMemPoolUsage();
qDebug("task handle destroyed"); qDebug("task handle destroyed");
} }

View File

@ -141,9 +141,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
if (taskHandle) { if (taskHandle) {
qwDbgSimulateSleep(); qwDbgSimulateSleep();
taosEnableFullMemPoolUsage(ctx->memPoolSession); taosEnableMemPoolUsage(ctx->memPoolSession);
code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch);
taosDisableFullMemPoolUsage(); taosDisableMemPoolUsage();
if (code) { if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
@ -780,9 +780,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
ctx->queryMsgType = qwMsg->msgType; ctx->queryMsgType = qwMsg->msgType;
ctx->localExec = false; ctx->localExec = false;
taosEnableFullMemPoolUsage(ctx->memPoolSession); taosEnableMemPoolUsage(ctx->memPoolSession);
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
taosDisableFullMemPoolUsage(); taosDisableMemPoolUsage();
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
@ -790,9 +790,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
taosEnableFullMemPoolUsage(ctx->memPoolSession); taosEnableMemPoolUsage(ctx->memPoolSession);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH);
taosDisableFullMemPoolUsage(); taosDisableMemPoolUsage();
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));

View File

@ -1252,7 +1252,7 @@ _return:
void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) { void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolFullFunc) {
return mpDirectAlloc(poolHandle, ((SMPSession*)session)->pJob, size); return mpDirectAlloc(poolHandle, ((SMPSession*)session)->pJob, size);
} }
@ -1282,7 +1282,7 @@ _return:
} }
void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) { void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolFullFunc) {
return mpDirectCalloc(poolHandle, ((SMPSession*)session)->pJob, num, size); return mpDirectCalloc(poolHandle, ((SMPSession*)session)->pJob, num, size);
} }
@ -1314,7 +1314,7 @@ _return:
} }
void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) { void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolFullFunc) {
return mpDirectRealloc(poolHandle, ((SMPSession*)session)->pJob, ptr, size); return mpDirectRealloc(poolHandle, ((SMPSession*)session)->pJob, ptr, size);
} }
@ -1365,7 +1365,7 @@ _return:
} }
char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolFullFunc) {
return mpDirectStrdup(poolHandle, ((SMPSession*)session)->pJob, ptr); return mpDirectStrdup(poolHandle, ((SMPSession*)session)->pJob, ptr);
} }
@ -1401,7 +1401,7 @@ _return:
} }
char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) { char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolFullFunc) {
return mpDirectStrndup(poolHandle, ((SMPSession*)session)->pJob, ptr, size); return mpDirectStrndup(poolHandle, ((SMPSession*)session)->pJob, ptr, size);
} }
@ -1439,7 +1439,7 @@ _return:
void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolFullFunc) {
mpDirectFree(poolHandle, ((SMPSession*)session)->pJob, ptr); mpDirectFree(poolHandle, ((SMPSession*)session)->pJob, ptr);
return; return;
} }
@ -1465,7 +1465,7 @@ void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName,
} }
int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolFullFunc) {
return taosMemSize(ptr); return taosMemSize(ptr);
} }
@ -1487,7 +1487,7 @@ int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, cha
} }
void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) { void* taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolFullFunc) {
return mpDirectAlignAlloc(poolHandle, ((SMPSession*)session)->pJob, alignment, size); return mpDirectAlignAlloc(poolHandle, ((SMPSession*)session)->pJob, alignment, size);
} }
@ -1546,7 +1546,7 @@ void taosMemPoolModDestroy(void) {
int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) { int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) {
if (0 == tsMemPoolDebug) { if (0 == tsMemPoolFullFunc) {
return taosMemTrim(size, trimed); return taosMemTrim(size, trimed);
} }

View File

@ -1576,12 +1576,15 @@ TEST(PerfTest, allocLatency) {
#if 0 #if 0
TEST(FuncTest, SingleThreadTest) { TEST(poolFuncTest, SingleThreadTest) {
char* caseName = "FuncTest:SingleThreadTest"; char* caseName = "poolFuncTest:SingleThreadTest";
SMPTestParam param = {0}; SMPTestParam param = {0};
param.reserveMode = true; param.reserveMode = true;
param.threadNum = 1; param.threadNum = 1;
param.jobQuota = 1024; param.jobQuota = 1024;
param.enableMemPool = true;
tsMemPoolFullFunc = 0;
mptPrintTestBeginInfo(caseName, &param); mptPrintTestBeginInfo(caseName, &param);
@ -1592,8 +1595,8 @@ TEST(FuncTest, SingleThreadTest) {
} }
#endif #endif
#if 0 #if 0
TEST(EnablePoolFuncTest, MultiThreadTest) { TEST(poolFuncTest, MultiThreadTest) {
char* caseName = "FuncTest:MultiThreadTest"; char* caseName = "poolFuncTest:MultiThreadTest";
SMPTestParam param = {0}; SMPTestParam param = {0};
param.reserveMode = true; param.reserveMode = true;
param.threadNum = 6; param.threadNum = 6;
@ -1601,6 +1604,8 @@ TEST(EnablePoolFuncTest, MultiThreadTest) {
param.randTask = true; param.randTask = true;
param.enableMemPool = true; param.enableMemPool = true;
tsMemPoolFullFunc = 0;
mptPrintTestBeginInfo(caseName, &param); mptPrintTestBeginInfo(caseName, &param);
for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) {
@ -1610,7 +1615,48 @@ TEST(EnablePoolFuncTest, MultiThreadTest) {
} }
#endif #endif
#if 0
TEST(poolFullFuncTest, SingleThreadTest) {
char* caseName = "poolFullFuncTest:SingleThreadTest";
SMPTestParam param = {0};
param.reserveMode = true;
param.threadNum = 1;
param.jobQuota = 1024;
param.enableMemPool = true;
tsMemPoolFullFunc = 1;
mptPrintTestBeginInfo(caseName, &param);
for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) {
mptRunCase(&param, i);
}
}
#endif
#if 1 #if 1
TEST(poolFullFuncTest, MultiThreadTest) {
char* caseName = "poolFullFuncTest:MultiThreadTest";
SMPTestParam param = {0};
param.reserveMode = true;
param.threadNum = 6;
param.jobQuota = 1024;
param.randTask = true;
param.enableMemPool = true;
tsMemPoolFullFunc = 1;
mptPrintTestBeginInfo(caseName, &param);
for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) {
mptRunCase(&param, i);
}
}
#endif
#if 0
TEST(DisablePoolFuncTest, MultiThreadTest) { TEST(DisablePoolFuncTest, MultiThreadTest) {
char* caseName = "FuncTest:MultiThreadTest"; char* caseName = "FuncTest:MultiThreadTest";
SMPTestParam param = {0}; SMPTestParam param = {0};