chore: remove asserts of mempool

This commit is contained in:
kailixu 2024-12-13 15:56:49 +08:00
parent b58620cedf
commit 1509df2e91
4 changed files with 243 additions and 264 deletions

View File

@ -505,7 +505,7 @@ pipeline {
} }
} }
stage('linux test') { stage('linux test') {
agent{label "slave1_47 || slave1_48 || slave1_49 || slave1_50 || slave1_52 || slave1_59 || slave1_63"} agent{label "slave1_47 || slave1_48 || slave1_49 || slave1_50 || slave1_52 || slave1_59 || slave1_63 || worker03 || slave215 || slave217 || slave219 "}
options { skipDefaultCheckout() } options { skipDefaultCheckout() }
when { when {
changeRequest() changeRequest()

View File

@ -21,9 +21,9 @@ extern "C" {
#endif #endif
#include "os.h" #include "os.h"
#include "tlockfree.h"
#include "thash.h"
#include "tglobal.h" #include "tglobal.h"
#include "thash.h"
#include "tlockfree.h"
#define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000 #define MP_CHUNK_CACHE_ALLOC_BATCH_SIZE 1000
#define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500 #define MP_NSCHUNK_CACHE_ALLOC_BATCH_SIZE 500
@ -36,7 +36,6 @@ extern "C" {
#define MP_MIN_MEM_CHK_INTERVAL_MS 1 #define MP_MIN_MEM_CHK_INTERVAL_MS 1
#define MP_MEMORY_TRIM_INTERVAL_TIMES 500 #define MP_MEMORY_TRIM_INTERVAL_TIMES 500
#define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95) #define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95)
#define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9) #define MP_RETIRE_MID_THRESHOLD_PERCENT (0.9)
#define MP_RETIRE_LOW_THRESHOLD_PERCENT (0.85) #define MP_RETIRE_LOW_THRESHOLD_PERCENT (0.85)
@ -48,12 +47,10 @@ extern "C" {
#define MP_MIN_FREE_SIZE_AFTER_RESERVE (4 * 1024 * 1048576L) #define MP_MIN_FREE_SIZE_AFTER_RESERVE (4 * 1024 * 1048576L)
#define MP_MIN_MEM_POOL_SIZE (5 * 1024 * 1048576L) #define MP_MIN_MEM_POOL_SIZE (5 * 1024 * 1048576L)
// FLAGS AREA // FLAGS AREA
#define MP_CHUNK_FLAG_IN_USE (1 << 0) #define MP_CHUNK_FLAG_IN_USE (1 << 0)
#define MP_CHUNK_FLAG_NS_CHUNK (1 << 1) #define MP_CHUNK_FLAG_NS_CHUNK (1 << 1)
// STAT FLAGS // STAT FLAGS
#define MP_LOG_FLAG_ALL_MEM (1 << 0) #define MP_LOG_FLAG_ALL_MEM (1 << 0)
#define MP_LOG_FLAG_ALL_CHUNK (1 << 1) #define MP_LOG_FLAG_ALL_CHUNK (1 << 1)
@ -69,7 +66,6 @@ extern "C" {
#define MP_STAT_FLAG_LOG_ALL (0xFFFFFFFFFFFFFFFF) #define MP_STAT_FLAG_LOG_ALL (0xFFFFFFFFFFFFFFFF)
// STAT PROCESURE FLAGS // STAT PROCESURE FLAGS
#define MP_STAT_PROC_FLAG_EXEC (1 << 0) #define MP_STAT_PROC_FLAG_EXEC (1 << 0)
#define MP_STAT_PROC_FLAG_INPUT_ERR (1 << 1) #define MP_STAT_PROC_FLAG_INPUT_ERR (1 << 1)
@ -82,7 +78,6 @@ extern "C" {
#define MP_CTRL_FLAG_LOCK_DBG (1 << 2) #define MP_CTRL_FLAG_LOCK_DBG (1 << 2)
#define MP_CTRL_FLAG_LOG_MAXSIZE (1 << 3) #define MP_CTRL_FLAG_LOG_MAXSIZE (1 << 3)
typedef enum EMPStatLogItem { typedef enum EMPStatLogItem {
E_MP_STAT_LOG_MEM_MALLOC = 1, E_MP_STAT_LOG_MEM_MALLOC = 1,
E_MP_STAT_LOG_MEM_CALLOC, E_MP_STAT_LOG_MEM_CALLOC,
@ -133,7 +128,6 @@ typedef struct SMPNSChunk {
uint64_t memBytes; uint64_t memBytes;
} SMPNSChunk; } SMPNSChunk;
typedef struct SMPCacheGroup { typedef struct SMPCacheGroup {
int32_t nodesNum; int32_t nodesNum;
int32_t idleOffset; int32_t idleOffset;
@ -151,7 +145,6 @@ typedef struct SMPStatInput {
void* pOrigMem; void* pOrigMem;
} SMPStatInput; } SMPStatInput;
typedef struct SMPCtrlInfo { typedef struct SMPCtrlInfo {
int64_t statFlags; int64_t statFlags;
int64_t funcFlags; int64_t funcFlags;
@ -200,7 +193,6 @@ typedef struct SMPStatInfo {
SMPStatPos posStat; SMPStatPos posStat;
} SMPStatInfo; } SMPStatInfo;
typedef struct SMPJob { typedef struct SMPJob {
SMemPoolJob job; // KEEP IT FIRST SMemPoolJob job; // KEEP IT FIRST
SMPStatInfo stat; SMPStatInfo stat;
@ -276,7 +268,6 @@ typedef struct SMPChunkMgmt {
SMPChunk* readyNSChunkTail; SMPChunk* readyNSChunkTail;
} SMPChunkMgmt; } SMPChunkMgmt;
typedef struct SMemPool { typedef struct SMemPool {
char* name; char* name;
int16_t slotId; int16_t slotId;
@ -322,7 +313,6 @@ typedef struct SMemPoolMgmt {
extern SMemPoolMgmt gMPMgmt; extern SMemPoolMgmt gMPMgmt;
typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t*, uint32_t, void**); typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t*, uint32_t, void**);
typedef void (*mpFreeFunc)(SMemPool*, SMPSession*, void*, int64_t*); typedef void (*mpFreeFunc)(SMemPool*, SMPSession*, void*, int64_t*);
typedef int64_t (*mpGetSizeFunc)(SMemPool*, SMPSession*, void*); typedef int64_t (*mpGetSizeFunc)(SMemPool*, SMPSession*, void*);
@ -353,10 +343,13 @@ enum {
}; };
#define MP_STAT_FORMAT "%-8s => inErr:%10" PRId64 ", exec:%12" PRId64 ", succ:%12" PRId64 ", fail:%12" PRId64 #define MP_STAT_FORMAT "%-8s => inErr:%10" PRId64 ", exec:%12" PRId64 ", succ:%12" PRId64 ", fail:%12" PRId64
#define MP_STAT_ORIG_FORMAT "%-8s => inErr:%10" PRId64 ", exec:%12" PRId64 ", succ:%12" PRId64 ", fail:%12" PRId64 ", oExec:%12" PRId64 ", oSucc:%12" PRId64 ", oFail:%12" PRId64 #define MP_STAT_ORIG_FORMAT \
"%-8s => inErr:%10" PRId64 ", exec:%12" PRId64 ", succ:%12" PRId64 ", fail:%12" PRId64 ", oExec:%12" PRId64 \
", oSucc:%12" PRId64 ", oFail:%12" PRId64
#define MP_STAT_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail #define MP_STAT_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail
#define MP_STAT_ORIG_VALUE(_name, _item) _name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail, (_item).origExec, (_item).origSucc, (_item).origFail #define MP_STAT_ORIG_VALUE(_name, _item) \
_name, (_item).inErr, (_item).exec, (_item).succ, (_item).fail, (_item).origExec, (_item).origSucc, (_item).origFail
#define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \ #define MP_INIT_MEM_HEADER(_header, _size, _nsChunk) \
do { \ do { \
@ -384,49 +377,42 @@ enum {
do { \ do { \
if (MP_READ == (type)) { \ if (MP_READ == (type)) { \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try read lock"); \
uDebug("MP TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} \ } \
(_res) = taosRTryLockLatch(_lock); \ (_res) = taosRTryLockLatch(_lock); \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
uDebug("MP TRY RLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \ uDebug("MP TRY RLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", \
ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) > 0, "invalid lock value after try read lock"); \ __FILE__, __LINE__); \
} \ } \
} else { \ } else { \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try write lock"); \
uDebug("MP TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} \ } \
(_res) = taosWTryLockLatch(_lock); \ (_res) = taosWTryLockLatch(_lock); \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
uDebug("MP TRY WLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \ uDebug("MP TRY WLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", \
ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after try write lock"); \ __FILE__, __LINE__); \
} \ } \
} \ } \
} while (0) } while (0)
#define MP_LOCK(type, _lock) \ #define MP_LOCK(type, _lock) \
do { \ do { \
if (MP_READ == (type)) { \ if (MP_READ == (type)) { \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \
uDebug("MP RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} \ } \
taosRLockLatch(_lock); \ taosRLockLatch(_lock); \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
uDebug("MP RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \
} \ } \
} else { \ } else { \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \
uDebug("MP WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} \ } \
taosWLockLatch(_lock); \ taosWLockLatch(_lock); \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
uDebug("MP WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
} \ } \
} \ } \
} while (0) } while (0)
@ -435,28 +421,23 @@ enum {
do { \ do { \
if (MP_READ == (type)) { \ if (MP_READ == (type)) { \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
uDebug("MP RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} \ } \
taosRUnLockLatch(_lock); \ taosRUnLockLatch(_lock); \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
uDebug("MP RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
} \ } \
} else { \ } else { \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
uDebug("MP WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
} \ } \
taosWUnLockLatch(_lock); \ taosWUnLockLatch(_lock); \
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
uDebug("MP WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ uDebug("MP WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
} \ } \
} \ } \
} while (0) } while (0)
#define MP_ERR_RET(c) \ #define MP_ERR_RET(c) \
do { \ do { \
int32_t _code = c; \ int32_t _code = c; \
@ -484,17 +465,20 @@ enum {
} \ } \
} while (0) } while (0)
#define MP_CHECK_QUOTA(_pool, _job, _size) do { \ #define MP_CHECK_QUOTA(_pool, _job, _size) \
do { \
if (*(_pool)->cfg.jobQuota > 0) { \ if (*(_pool)->cfg.jobQuota > 0) { \
int64_t cAllocSize = atomic_add_fetch_64(&(_job)->job.allocMemSize, (_size)); \ int64_t cAllocSize = atomic_add_fetch_64(&(_job)->job.allocMemSize, (_size)); \
if (cAllocSize > (*(_pool)->cfg.jobQuota * 1048576L)) { \ if (cAllocSize > (*(_pool)->cfg.jobQuota * 1048576L)) { \
uWarn("job 0x%" PRIx64 " remainSession:%d allocSize %" PRId64 " is over than quota %dMB", (_job)->job.jobId, (_job)->job.remainSession, cAllocSize, *(_pool)->cfg.jobQuota); \ uWarn("job 0x%" PRIx64 " remainSession:%d allocSize %" PRId64 " is over than quota %dMB", (_job)->job.jobId, \
(_job)->job.remainSession, cAllocSize, *(_pool)->cfg.jobQuota); \
(_pool)->cfg.cb.reachFp(pJob->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); \ (_pool)->cfg.cb.reachFp(pJob->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); \
mpSchedTrim(NULL); \ mpSchedTrim(NULL); \
terrno = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; \ terrno = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; \
return NULL; \ return NULL; \
} else { \ } else { \
uDebug("job 0x%" PRIx64 " remainSession:%d allocSize %" PRId64 " is lower than quota %dMB", (_job)->job.jobId, (_job)->job.remainSession, cAllocSize, *(_pool)->cfg.jobQuota); \ uDebug("job 0x%" PRIx64 " remainSession:%d allocSize %" PRId64 " is lower than quota %dMB", (_job)->job.jobId, \
(_job)->job.remainSession, cAllocSize, *(_pool)->cfg.jobQuota); \
} \ } \
} \ } \
if (atomic_load_64(&tsCurrentAvailMemorySize) <= ((_pool)->cfg.reserveSize + (_size))) { \ if (atomic_load_64(&tsCurrentAvailMemorySize) <= ((_pool)->cfg.reserveSize + (_size))) { \
@ -507,7 +491,6 @@ enum {
} \ } \
} while (0) } while (0)
// direct // direct
void* mpDirectAlloc(SMemPool* pPool, SMPJob* pJob, int64_t size); void* mpDirectAlloc(SMemPool* pPool, SMPJob* pJob, int64_t size);
void* mpDirectAlignAlloc(SMemPool* pPool, SMPJob* pJob, uint32_t alignment, int64_t size); void* mpDirectAlignAlloc(SMemPool* pPool, SMPJob* pJob, uint32_t alignment, int64_t size);
@ -532,7 +515,6 @@ int32_t mpChunkRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64
int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession); int32_t mpChunkInitSession(SMemPool* pPool, SMPSession* pSession);
int32_t mpChunkUpdateCfg(SMemPool* pPool); int32_t mpChunkUpdateCfg(SMemPool* pPool);
int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes); int32_t mpPopIdleNode(SMemPool* pPool, SMPCacheGroupInfo* pInfo, void** ppRes);
int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size); int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size);
void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize); void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int64_t addSize);
@ -540,8 +522,6 @@ int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup
int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes); int32_t mpMalloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint32_t alignment, void** ppRes);
void mpSchedTrim(int64_t* loopTimes); void mpSchedTrim(int64_t* loopTimes);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -808,7 +808,7 @@ void mpLogPosStat(SMPStatPos* pStat, EMPStatLogItem item, SMPStatInput* pInput,
MP_ERR_JRET(code); MP_ERR_JRET(code);
} }
ASSERT((pInput->pOrigMem && pInput->origSize > 0) || (NULL == pInput->pOrigMem && pInput->origSize == 0)); // ASSSRT((pInput->pOrigMem && pInput->origSize > 0) || (NULL == pInput->pOrigMem && pInput->origSize == 0));
if (pInput->pOrigMem && pInput->origSize > 0) { if (pInput->pOrigMem && pInput->origSize > 0) {
code = taosHashRemove(pStat->remainHash, &pInput->pOrigMem, POINTER_BYTES); code = taosHashRemove(pStat->remainHash, &pInput->pOrigMem, POINTER_BYTES);

View File

@ -186,8 +186,7 @@ class TDTestCase:
raise Exception(repr(e)) raise Exception(repr(e))
finally: finally:
if infoFile: if infoFile:
infoFile.flush() infoFile.close()
# close()
def s3_check_show_grants_granted(self): def s3_check_show_grants_granted(self):
tdLog.printNoPrefix("======== test show grants granted: ") tdLog.printNoPrefix("======== test show grants granted: ")