fix: add ut case
This commit is contained in:
parent
1a2dbe53a1
commit
f0aacc7e4a
|
@ -37,6 +37,41 @@ typedef struct SMemPoolJob {
|
||||||
int64_t maxAllocMemSize;
|
int64_t maxAllocMemSize;
|
||||||
} SMemPoolJob;
|
} SMemPoolJob;
|
||||||
|
|
||||||
|
typedef struct SMPStatItem {
|
||||||
|
int64_t inErr;
|
||||||
|
int64_t exec;
|
||||||
|
int64_t succ;
|
||||||
|
int64_t fail;
|
||||||
|
} SMPStatItem;
|
||||||
|
|
||||||
|
typedef struct SMPStatItemExt {
|
||||||
|
int64_t inErr;
|
||||||
|
int64_t exec;
|
||||||
|
int64_t succ;
|
||||||
|
int64_t fail;
|
||||||
|
int64_t origExec;
|
||||||
|
int64_t origSucc;
|
||||||
|
int64_t origFail;
|
||||||
|
} SMPStatItemExt;
|
||||||
|
|
||||||
|
typedef struct SMPMemoryStat {
|
||||||
|
SMPStatItem memMalloc;
|
||||||
|
SMPStatItem memCalloc;
|
||||||
|
SMPStatItemExt memRealloc;
|
||||||
|
SMPStatItem strdup;
|
||||||
|
SMPStatItem memFree;
|
||||||
|
|
||||||
|
SMPStatItem chunkMalloc;
|
||||||
|
SMPStatItem chunkRecycle;
|
||||||
|
SMPStatItem chunkReUse;
|
||||||
|
SMPStatItem chunkFree;
|
||||||
|
} SMPMemoryStat;
|
||||||
|
|
||||||
|
typedef struct SMPStatDetail {
|
||||||
|
SMPMemoryStat times;
|
||||||
|
SMPMemoryStat bytes;
|
||||||
|
} SMPStatDetail;
|
||||||
|
|
||||||
|
|
||||||
typedef void (*mpDecConcSessionNum)(void);
|
typedef void (*mpDecConcSessionNum)(void);
|
||||||
typedef void (*mpIncConcSessionNum)(void);
|
typedef void (*mpIncConcSessionNum)(void);
|
||||||
|
@ -81,6 +116,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob);
|
||||||
void taosMemPoolDestroySession(void* poolHandle, void* session);
|
void taosMemPoolDestroySession(void* poolHandle, void* session);
|
||||||
int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob);
|
int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob);
|
||||||
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg);
|
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg);
|
||||||
|
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName);
|
||||||
|
|
||||||
#define taosMemPoolFreeClear(ptr) \
|
#define taosMemPoolFreeClear(ptr) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -101,14 +137,14 @@ extern threadlocal void* threadPoolSession;
|
||||||
#define taosSaveDisableMemoryPoolUsage(_handle) do { (_handle) = threadPoolHandle; threadPoolHandle = NULL; } while (0)
|
#define taosSaveDisableMemoryPoolUsage(_handle) do { (_handle) = threadPoolHandle; threadPoolHandle = NULL; } while (0)
|
||||||
#define taosRestoreEnableMemoryPoolUsage(_handle) (threadPoolHandle = (_handle))
|
#define taosRestoreEnableMemoryPoolUsage(_handle) (threadPoolHandle = (_handle))
|
||||||
|
|
||||||
#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemMalloc(_size)))
|
#define taosMemoryMalloc(_size) ((NULL != threadPoolHandle) ? (taosMemPoolMalloc(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemMalloc(_size)))
|
||||||
#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, __FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
|
#define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size)))
|
||||||
#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, __FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
|
#define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size)))
|
||||||
#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, threadPoolSession, _ptr, __FILE__, __LINE__)) : (taosStrdupi(_ptr)))
|
#define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr)))
|
||||||
#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemFree(_ptr)))
|
#define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr)))
|
||||||
#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, __FILE__, __LINE__)) : (taosMemSize(_ptr)))
|
#define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr)))
|
||||||
#define taosMemoryTrim(_size) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size)))
|
#define taosMemoryTrim(_size) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemTrim(_size)))
|
||||||
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
|
#define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
|
||||||
#else
|
#else
|
||||||
#define taosEnableMemoryPoolUsage(_pool, _session)
|
#define taosEnableMemoryPoolUsage(_pool, _session)
|
||||||
#define taosDisableMemoryPoolUsage()
|
#define taosDisableMemoryPoolUsage()
|
||||||
|
|
|
@ -75,6 +75,7 @@ void taosInitRWLatch(SRWLatch *pLatch);
|
||||||
void taosWLockLatch(SRWLatch *pLatch);
|
void taosWLockLatch(SRWLatch *pLatch);
|
||||||
void taosWUnLockLatch(SRWLatch *pLatch);
|
void taosWUnLockLatch(SRWLatch *pLatch);
|
||||||
void taosRLockLatch(SRWLatch *pLatch);
|
void taosRLockLatch(SRWLatch *pLatch);
|
||||||
|
int32_t taosRTryLockLatch(SRWLatch *pLatch);
|
||||||
void taosRUnLockLatch(SRWLatch *pLatch);
|
void taosRUnLockLatch(SRWLatch *pLatch);
|
||||||
int32_t taosWTryLockLatch(SRWLatch *pLatch);
|
int32_t taosWTryLockLatch(SRWLatch *pLatch);
|
||||||
|
|
||||||
|
|
|
@ -137,40 +137,6 @@ typedef struct SMPStatInput {
|
||||||
int32_t line;
|
int32_t line;
|
||||||
} SMPStatInput;
|
} SMPStatInput;
|
||||||
|
|
||||||
typedef struct SMPStatItem {
|
|
||||||
int64_t inErr;
|
|
||||||
int64_t exec;
|
|
||||||
int64_t succ;
|
|
||||||
int64_t fail;
|
|
||||||
} SMPStatItem;
|
|
||||||
|
|
||||||
typedef struct SMPStatItemExt {
|
|
||||||
int64_t inErr;
|
|
||||||
int64_t exec;
|
|
||||||
int64_t succ;
|
|
||||||
int64_t fail;
|
|
||||||
int64_t origExec;
|
|
||||||
int64_t origSucc;
|
|
||||||
int64_t origFail;
|
|
||||||
} SMPStatItemExt;
|
|
||||||
|
|
||||||
typedef struct SMPMemoryStat {
|
|
||||||
SMPStatItem memMalloc;
|
|
||||||
SMPStatItem memCalloc;
|
|
||||||
SMPStatItemExt memRealloc;
|
|
||||||
SMPStatItem strdup;
|
|
||||||
SMPStatItem memFree;
|
|
||||||
|
|
||||||
SMPStatItem chunkMalloc;
|
|
||||||
SMPStatItem chunkRecycle;
|
|
||||||
SMPStatItem chunkReUse;
|
|
||||||
SMPStatItem chunkFree;
|
|
||||||
} SMPMemoryStat;
|
|
||||||
|
|
||||||
typedef struct SMPStatDetail {
|
|
||||||
SMPMemoryStat times;
|
|
||||||
SMPMemoryStat bytes;
|
|
||||||
} SMPStatDetail;
|
|
||||||
|
|
||||||
typedef struct SMPCtrlInfo {
|
typedef struct SMPCtrlInfo {
|
||||||
int64_t statFlags;
|
int64_t statFlags;
|
||||||
|
|
|
@ -91,4 +91,21 @@ void taosRLockLatch(SRWLatch *pLatch) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// no reentrant
|
||||||
|
int32_t taosRTryLockLatch(SRWLatch *pLatch) {
|
||||||
|
SRWLatch oLatch, nLatch;
|
||||||
|
oLatch = atomic_load_32(pLatch);
|
||||||
|
if (oLatch) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
nLatch = oLatch + 1;
|
||||||
|
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void taosRUnLockLatch(SRWLatch *pLatch) { (void)atomic_fetch_sub_32(pLatch, 1); }
|
void taosRUnLockLatch(SRWLatch *pLatch) { (void)atomic_fetch_sub_32(pLatch, 1); }
|
||||||
|
|
|
@ -429,40 +429,7 @@ void mpPrintSessionStat(SMPCtrlInfo* pCtrl, SMPStatSession* pSessStat, char* det
|
||||||
uInfo("session destroyed num: %" PRId64, pSessStat->destroyNum);
|
uInfo("session destroyed num: %" PRId64, pSessStat->destroyNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mpPrintStat(SMemPool* pPool, SMPSession* pSession, char* procName) {
|
|
||||||
char detailName[128];
|
|
||||||
|
|
||||||
if (NULL != pSession) {
|
|
||||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session");
|
|
||||||
detailName[sizeof(detailName) - 1] = 0;
|
|
||||||
mpPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize);
|
|
||||||
|
|
||||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFile");
|
|
||||||
detailName[sizeof(detailName) - 1] = 0;
|
|
||||||
mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName);
|
|
||||||
|
|
||||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFileLine");
|
|
||||||
detailName[sizeof(detailName) - 1] = 0;
|
|
||||||
mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName);
|
|
||||||
}
|
|
||||||
|
|
||||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
|
|
||||||
detailName[sizeof(detailName) - 1] = 0;
|
|
||||||
mpPrintSessionStat(&pPool->ctrlInfo, &pPool->stat.statSession, detailName);
|
|
||||||
mpPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
|
|
||||||
|
|
||||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
|
|
||||||
detailName[sizeof(detailName) - 1] = 0;
|
|
||||||
mpPrintNodeStat(&pSession->ctrlInfo, pSession->stat.nodeStat, detailName);
|
|
||||||
|
|
||||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFile");
|
|
||||||
detailName[sizeof(detailName) - 1] = 0;
|
|
||||||
mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName);
|
|
||||||
|
|
||||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFileLine");
|
|
||||||
detailName[sizeof(detailName) - 1] = 0;
|
|
||||||
mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName);
|
|
||||||
}
|
|
||||||
|
|
||||||
void mpLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) {
|
void mpLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* pInput) {
|
||||||
switch (item) {
|
switch (item) {
|
||||||
|
@ -653,6 +620,44 @@ _return:
|
||||||
gMPMgmt.code = code;
|
gMPMgmt.code = code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
|
||||||
|
SMemPool* pPool = (SMemPool*)poolHandle;
|
||||||
|
SMPSession* pSession = (SMPSession*)session;
|
||||||
|
char detailName[128];
|
||||||
|
|
||||||
|
if (NULL != pSession) {
|
||||||
|
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session");
|
||||||
|
detailName[sizeof(detailName) - 1] = 0;
|
||||||
|
mpPrintStatDetail(&pSession->ctrlInfo, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize);
|
||||||
|
|
||||||
|
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFile");
|
||||||
|
detailName[sizeof(detailName) - 1] = 0;
|
||||||
|
mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName);
|
||||||
|
|
||||||
|
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "SessionFileLine");
|
||||||
|
detailName[sizeof(detailName) - 1] = 0;
|
||||||
|
mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName);
|
||||||
|
}
|
||||||
|
|
||||||
|
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
|
||||||
|
detailName[sizeof(detailName) - 1] = 0;
|
||||||
|
mpPrintSessionStat(&pPool->ctrlInfo, &pPool->stat.statSession, detailName);
|
||||||
|
mpPrintStatDetail(&pPool->ctrlInfo, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
|
||||||
|
|
||||||
|
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
|
||||||
|
detailName[sizeof(detailName) - 1] = 0;
|
||||||
|
mpPrintNodeStat(&pSession->ctrlInfo, pSession->stat.nodeStat, detailName);
|
||||||
|
|
||||||
|
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFile");
|
||||||
|
detailName[sizeof(detailName) - 1] = 0;
|
||||||
|
mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.fileStat, detailName);
|
||||||
|
|
||||||
|
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolFileLine");
|
||||||
|
detailName[sizeof(detailName) - 1] = 0;
|
||||||
|
mpPrintFileLineStat(&pSession->ctrlInfo, pSession->stat.lineStat, detailName);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
|
int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SMemPool* pPool = NULL;
|
SMemPool* pPool = NULL;
|
||||||
|
@ -714,7 +719,7 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) {
|
||||||
|
|
||||||
(void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1);
|
(void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1);
|
||||||
|
|
||||||
mpPrintStat(pPool, pSession, "DestroySession");
|
taosMemPoolPrintStat(pPool, pSession, "DestroySession");
|
||||||
|
|
||||||
TAOS_MEMSET(pSession, 0, sizeof(*pSession));
|
TAOS_MEMSET(pSession, 0, sizeof(*pSession));
|
||||||
|
|
||||||
|
|
|
@ -140,4 +140,13 @@ if (${TD_LINUX})
|
||||||
add_custom_command(TARGET terrorTest POST_BUILD
|
add_custom_command(TARGET terrorTest POST_BUILD
|
||||||
COMMAND ${CMAKE_COMMAND} -E copy_if_different ${ERR_TBL_FILE} $<TARGET_FILE_DIR:terrorTest>
|
COMMAND ${CMAKE_COMMAND} -E copy_if_different ${ERR_TBL_FILE} $<TARGET_FILE_DIR:terrorTest>
|
||||||
)
|
)
|
||||||
endif ()
|
|
||||||
|
# memPoolTest
|
||||||
|
add_executable(memPoolTest "memPoolTest.cpp")
|
||||||
|
target_link_libraries(memPoolTest os util common gtest_main)
|
||||||
|
add_test(
|
||||||
|
NAME memPoolTest
|
||||||
|
COMMAND memPoolTest
|
||||||
|
)
|
||||||
|
|
||||||
|
endif ()
|
||||||
|
|
|
@ -31,27 +31,37 @@
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "executor.h"
|
#include "thash.h"
|
||||||
#include "executorInt.h"
|
#include "theap.h"
|
||||||
#include "function.h"
|
|
||||||
#include "operator.h"
|
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "tdatablock.h"
|
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "tvariant.h"
|
#include "tvariant.h"
|
||||||
#include "stub.h"
|
#include "stub.h"
|
||||||
#include "querytask.h"
|
|
||||||
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
#define MPT_PRINTF (void)printf
|
#define MPT_PRINTF (void)printf
|
||||||
|
#define MPT_MAX_MEM_ACT_TIMES 10000
|
||||||
#define MPT_MAX_SESSION_NUM 256
|
#define MPT_MAX_SESSION_NUM 256
|
||||||
#define MPT_MAX_JOB_NUM 20000
|
#define MPT_MAX_JOB_NUM 200
|
||||||
|
#define MPT_MAX_THREAD_NUM 100
|
||||||
|
|
||||||
|
#define MPT_DEFAULT_RESERVE_MEM_PERCENT 20
|
||||||
|
#define MPT_MIN_RESERVE_MEM_SIZE (512 * 1048576UL)
|
||||||
|
#define MPT_MIN_MEM_POOL_SIZE (1048576UL)
|
||||||
|
#define MPT_MAX_RETIRE_JOB_NUM 10000
|
||||||
|
|
||||||
|
|
||||||
threadlocal void* mptThreadPoolHandle = NULL;
|
threadlocal void* mptThreadPoolHandle = NULL;
|
||||||
threadlocal void* mptThreadPoolSession = NULL;
|
threadlocal void* mptThreadPoolSession = NULL;
|
||||||
|
|
||||||
|
#define MPT_SET_TEID(id, tId, eId) \
|
||||||
|
do { \
|
||||||
|
*(uint64_t *)(id) = (tId); \
|
||||||
|
*(uint32_t *)((char *)(id) + sizeof(tId)) = (eId); \
|
||||||
|
} while (0)
|
||||||
|
|
||||||
|
|
||||||
#define mptEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0)
|
#define mptEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0)
|
||||||
#define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL)
|
#define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL)
|
||||||
|
@ -67,7 +77,7 @@ threadlocal void* mptThreadPoolSession = NULL;
|
||||||
#define mptMemoryTrim(_size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolTrim(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size)))
|
#define mptMemoryTrim(_size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolTrim(mptThreadPoolHandle, mptThreadPoolSession, _size, __FILE__, __LINE__)) : (taosMemTrim(_size)))
|
||||||
#define mptMemoryMallocAlign(_alignment, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolMallocAlign(mptThreadPoolHandle, mptThreadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
|
#define mptMemoryMallocAlign(_alignment, _size) ((NULL != mptThreadPoolHandle) ? (taosMemPoolMallocAlign(mptThreadPoolHandle, mptThreadPoolSession, _alignment, _size, __FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size)))
|
||||||
|
|
||||||
typedef enum {
|
enum {
|
||||||
MPT_SMALL_MSIZE = 0,
|
MPT_SMALL_MSIZE = 0,
|
||||||
MPT_BIG_MSIZE,
|
MPT_BIG_MSIZE,
|
||||||
};
|
};
|
||||||
|
@ -85,6 +95,7 @@ typedef struct SMPTJobInfo {
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
SMemPoolJob* memInfo;
|
SMemPoolJob* memInfo;
|
||||||
SHashObj* pSessions;
|
SHashObj* pSessions;
|
||||||
|
void* pCtx;
|
||||||
} SMPTJobInfo;
|
} SMPTJobInfo;
|
||||||
|
|
||||||
|
|
||||||
|
@ -94,33 +105,71 @@ typedef struct {
|
||||||
} SMPTestCtrl;
|
} SMPTestCtrl;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
void* p;
|
||||||
|
int64_t size;
|
||||||
|
} SMPTestMemInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int8_t taskExecLock;
|
||||||
|
bool taskFinished;
|
||||||
|
|
||||||
|
int64_t poolMaxUsedSize;
|
||||||
|
int64_t poolTotalUsedSize;
|
||||||
|
|
||||||
|
SMPStatDetail stat;
|
||||||
|
|
||||||
|
int32_t memIdx;
|
||||||
|
SMPTestMemInfo* pMemList[MPT_MAX_MEM_ACT_TIMES];
|
||||||
|
|
||||||
|
|
||||||
|
int64_t npSize;
|
||||||
|
int32_t npMemIdx;
|
||||||
|
SMPTestMemInfo* npMemList[MPT_MAX_MEM_ACT_TIMES];
|
||||||
|
|
||||||
|
bool taskFreed;
|
||||||
|
} SMPTestTaskCtx;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SRWLatch jobExecLock;
|
||||||
|
|
||||||
|
int32_t jobIdx;
|
||||||
uint64_t jobId;
|
uint64_t jobId;
|
||||||
int32_t taskNum;
|
|
||||||
int64_t poolMaxSize;
|
|
||||||
int64_t npoolSize;
|
|
||||||
void* pSessions[MPT_MAX_SESSION_NUM];
|
void* pSessions[MPT_MAX_SESSION_NUM];
|
||||||
|
int32_t taskNum;
|
||||||
SMPTestTaskCtx taskCtxs[MPT_MAX_SESSION_NUM];
|
SMPTestTaskCtx taskCtxs[MPT_MAX_SESSION_NUM];
|
||||||
|
int32_t taskExecIdx;
|
||||||
|
|
||||||
|
SMPTJobInfo* pJob;
|
||||||
|
int32_t jobStatus;
|
||||||
} SMPTestJobCtx;
|
} SMPTestJobCtx;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t poolMaxSize;
|
int64_t jobQuota;
|
||||||
int64_t npoolSize;
|
bool autoPoolSize;
|
||||||
int32_t memActTimes;
|
int32_t poolSize;
|
||||||
} SMPTestTaskCtx;
|
int32_t maxExecJobNum;
|
||||||
|
int32_t threadNum;
|
||||||
|
int32_t randTask;
|
||||||
|
} SMPTestParam;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
bool allJobs;
|
||||||
|
bool autoJob;
|
||||||
|
} SMPTestThread;
|
||||||
|
|
||||||
typedef struct SMPTestCtx {
|
typedef struct SMPTestCtx {
|
||||||
uint64_t qId;
|
uint64_t qId;
|
||||||
uint64_t tId;
|
|
||||||
int32_t eId;
|
|
||||||
SHashObj* pJobs;
|
SHashObj* pJobs;
|
||||||
BoundedQueue* pJobQueue;
|
BoundedQueue* pJobQueue;
|
||||||
void* memPoolHandle;
|
void* memPoolHandle;
|
||||||
|
SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM];
|
||||||
SMPTestJobCtx jobCtxs[MPT_MAX_JOB_NUM];
|
SMPTestJobCtx jobCtxs[MPT_MAX_JOB_NUM];
|
||||||
|
SMPTestParam param;
|
||||||
} SMPTestCtx;
|
} SMPTestCtx;
|
||||||
|
|
||||||
SMPTestCtx mptCtx = {0};
|
SMPTestCtx mptCtx = {0};
|
||||||
|
|
||||||
|
#if 0
|
||||||
void joinTestReplaceRetrieveFp() {
|
void joinTestReplaceRetrieveFp() {
|
||||||
static Stub stub;
|
static Stub stub;
|
||||||
stub.set(getNextBlockFromDownstreamRemain, getDummyInputBlock);
|
stub.set(getNextBlockFromDownstreamRemain, getDummyInputBlock);
|
||||||
|
@ -143,6 +192,7 @@ void joinTestReplaceRetrieveFp() {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void mptInitLogFile() {
|
void mptInitLogFile() {
|
||||||
const char *defaultLogFileNamePrefix = "mplog";
|
const char *defaultLogFileNamePrefix = "mplog";
|
||||||
|
@ -153,13 +203,13 @@ void mptInitLogFile() {
|
||||||
TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH);
|
TAOS_STRCPY(tsLogDir, TD_LOG_DIR_PATH);
|
||||||
|
|
||||||
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
|
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
|
||||||
JT_PRINTF("failed to open log file in directory:%s\n", tsLogDir);
|
MPT_PRINTF("failed to open log file in directory:%s\n", tsLogDir);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mptJobMemSizeCompFn(void* l, void* r, void* param) {
|
static bool mptJobMemSizeCompFn(void* l, void* r, void* param) {
|
||||||
SQWJobInfo* left = (SQWJobInfo*)l;
|
SMPTJobInfo* left = (SMPTJobInfo*)l;
|
||||||
SQWJobInfo* right = (SQWJobInfo*)r;
|
SMPTJobInfo* right = (SMPTJobInfo*)r;
|
||||||
if (atomic_load_8(&right->retired)) {
|
if (atomic_load_8(&right->retired)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -174,29 +224,111 @@ void mptInit() {
|
||||||
mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
ASSERT_TRUE(NULL != mptCtx.pJobs);
|
ASSERT_TRUE(NULL != mptCtx.pJobs);
|
||||||
|
|
||||||
mptCtx.pJobQueue = createBoundedQueue(10000, qwJobMemSizeCompFn, NULL, NULL);
|
mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, NULL, NULL);
|
||||||
ASSERT_TRUE(NULL != mptCtx.pJobQueue);
|
ASSERT_TRUE(NULL != mptCtx.pJobQueue);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void mptRetireJob(SQWJobInfo* pJob) {
|
void mptDestroyTaskCtx(SMPTestTaskCtx* pTask) {
|
||||||
//TODO
|
for (int32_t i = 0; i < pTask->memIdx; ++i) {
|
||||||
|
taosMemFreeClear(pTask->pMemList[i].p);
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < pTask->npMemIdx; ++i) {
|
||||||
|
taosMemFreeClear(pTask->npMemList[i].p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void mptDestroyJobInfo(SMPTJobInfo* pJob) {
|
||||||
|
taosMemFree(pJob->memInfo);
|
||||||
|
taosHashCleanup(pJob->pSessions);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
|
||||||
|
taosWLockLatch(&pJobCtx->jobExecLock);
|
||||||
|
|
||||||
|
mptDestroyJobInfo(pJobCtx->pJob);
|
||||||
|
(void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId));
|
||||||
|
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
|
||||||
|
taosMemPoolDestroySession(mptCtx.memPoolHandle, pJobCtx->pSessions[i]);
|
||||||
|
mptDestroyTaskCtx(&pJobCtx->taskCtxs[i]);
|
||||||
|
}
|
||||||
|
if (reset) {
|
||||||
|
memset((char*)pJobCtx + sizeof(pJobCtx->jobExecLock), 0, sizeof(SMPTestJobCtx) - sizeof(pJobCtx->jobExecLock));
|
||||||
|
mptInitJob(pJobCtx->jobIdx);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pJobCtx->jobExecLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mptCheckCompareJobInfo(SMPTestJobCtx* pJobCtx) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void mptResetJob(SMPTestJobCtx* pJobCtx) {
|
||||||
|
mptDestroyJob(pJobCtx, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mptRetireJob(SMPTJobInfo* pJob) {
|
||||||
|
SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx;
|
||||||
|
|
||||||
|
mptCheckCompareJobInfo(pCtx);
|
||||||
|
|
||||||
|
mptResetJob(pCtx);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mptGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) {
|
||||||
|
int64_t reserveSize = TMAX(totalSize * MPT_DEFAULT_RESERVE_MEM_PERCENT / 100 / 1048576UL * 1048576UL, MPT_MIN_RESERVE_MEM_SIZE);
|
||||||
|
int64_t availSize = (totalSize - reserveSize) / 1048576UL * 1048576UL;
|
||||||
|
if (availSize < MPT_MIN_MEM_POOL_SIZE) {
|
||||||
|
uError("too little available query memory, totalAvailable: %" PRId64 ", reserveSize: %" PRId64, totalSize, reserveSize);
|
||||||
|
return TSDB_CODE_QRY_TOO_FEW_AVAILBLE_MEM;
|
||||||
|
}
|
||||||
|
|
||||||
|
*maxSize = availSize;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mptGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) {
|
||||||
|
if (!mptCtx.param.autoPoolSize && mptCtx.param.poolSize > 0) {
|
||||||
|
*pMaxSize = mptCtx.param.poolSize * 1048576UL;
|
||||||
|
*autoMaxSize = false;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t memSize = 0;
|
||||||
|
int32_t code = taosGetSysAvailMemory(&memSize);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
uError("get system avaiable memory size failed, error: 0x%x", code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = mptGetMemPoolMaxMemSize(memSize, pMaxSize);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*autoMaxSize = true;
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void mptCheckUpateCfgCb(void* pHandle, void* cfg) {
|
void mptCheckUpateCfgCb(void* pHandle, void* cfg) {
|
||||||
SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg;
|
SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg;
|
||||||
int64_t newJobQuota = tsSingleQueryMaxMemorySize * 1048576UL;
|
int64_t newJobQuota = mptCtx.singleQueryMaxSize * 1048576UL;
|
||||||
if (pCfg->jobQuota != newJobQuota) {
|
if (pCfg->jobQuota != newJobQuota) {
|
||||||
atomic_store_64(&pCfg->jobQuota, newJobQuota);
|
atomic_store_64(&pCfg->jobQuota, newJobQuota);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t maxSize = 0;
|
int64_t maxSize = 0;
|
||||||
bool autoMaxSize = false;
|
bool autoMaxSize = false;
|
||||||
int32_t code = qwGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize);
|
int32_t code = mptGetQueryMemPoolMaxSize(&maxSize, &autoMaxSize);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
pCfg->maxSize = 0;
|
pCfg->maxSize = 0;
|
||||||
qError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize);
|
uError("get query memPool maxSize failed, reset maxSize to %" PRId64, pCfg->maxSize);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,7 +346,7 @@ void mptLowLevelRetire(int64_t retireSize, int32_t errCode) {
|
||||||
if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
|
if (aSize >= retireSize && 0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
|
||||||
mptRetireJob(pJob);
|
mptRetireJob(pJob);
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
|
uDebug("QID:0x%" PRIx64 " job retired cause of low level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
|
||||||
pJob->memInfo->jobId, aSize, retireSize);
|
pJob->memInfo->jobId, aSize, retireSize);
|
||||||
|
|
||||||
taosHashCancelIterate(mptCtx.pJobs, pJob);
|
taosHashCancelIterate(mptCtx.pJobs, pJob);
|
||||||
|
@ -226,7 +358,7 @@ void mptLowLevelRetire(int64_t retireSize, int32_t errCode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void mptMidLevelRetire(int64_t retireSize, int32_t errCode) {
|
void mptMidLevelRetire(int64_t retireSize, int32_t errCode) {
|
||||||
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobInfo, NULL);
|
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
|
||||||
PriorityQueueNode qNode;
|
PriorityQueueNode qNode;
|
||||||
while (NULL != pJob) {
|
while (NULL != pJob) {
|
||||||
if (0 == atomic_load_8(&pJob->retired)) {
|
if (0 == atomic_load_8(&pJob->retired)) {
|
||||||
|
@ -256,7 +388,7 @@ void mptMidLevelRetire(int64_t retireSize, int32_t errCode) {
|
||||||
|
|
||||||
mptRetireJob(pJob);
|
mptRetireJob(pJob);
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
|
uDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
|
||||||
pJob->memInfo->jobId, aSize, retireSize);
|
pJob->memInfo->jobId, aSize, retireSize);
|
||||||
|
|
||||||
retiredSize += aSize;
|
retiredSize += aSize;
|
||||||
|
@ -277,23 +409,23 @@ void mptRetireJobsCb(int64_t retireSize, bool lowLevelRetire, int32_t errCode) {
|
||||||
void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) {
|
void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) {
|
||||||
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &mpJob->jobId, sizeof(mpJob->jobId));
|
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &mpJob->jobId, sizeof(mpJob->jobId));
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId);
|
uError("QID:0x%" PRIx64 " fail to get job from job hash", mpJob->jobId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
|
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
|
||||||
qwRetireJob(pJob);
|
mptRetireJob(pJob);
|
||||||
|
|
||||||
qInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode);
|
uInfo("QID:0x%" PRIx64 " retired directly, errCode: 0x%x", mpJob->jobId, errCode);
|
||||||
} else {
|
} else {
|
||||||
qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode));
|
uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x", mpJob->jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) {
|
int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) {
|
||||||
pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
if (NULL == pJob->pSessions) {
|
if (NULL == pJob->pSessions) {
|
||||||
qError("fail to init session hash, code: 0x%x", terrno);
|
uError("fail to init session hash, code: 0x%x", terrno);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,7 +440,7 @@ int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, void** ppSession) {
|
int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJobCtx, void** ppSession) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SMPTJobInfo* pJob = NULL;
|
SMPTJobInfo* pJob = NULL;
|
||||||
|
|
||||||
|
@ -323,7 +455,7 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, void** ppSession
|
||||||
|
|
||||||
code = taosHashPut(mptCtx.pJobs, &qId, sizeof(qId), &jobInfo, sizeof(jobInfo));
|
code = taosHashPut(mptCtx.pJobs, &qId, sizeof(qId), &jobInfo, sizeof(jobInfo));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
qwDestroyJobInfo(&jobInfo);
|
mptDestroyJobInfo(&jobInfo);
|
||||||
if (TSDB_CODE_DUP_KEY == code) {
|
if (TSDB_CODE_DUP_KEY == code) {
|
||||||
code = TSDB_CODE_SUCCESS;
|
code = TSDB_CODE_SUCCESS;
|
||||||
continue;
|
continue;
|
||||||
|
@ -334,7 +466,7 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, void** ppSession
|
||||||
|
|
||||||
pJob = (SMPTJobInfo*)taosHashAcquire(mptCtx.pJobs, &qId, sizeof(qId));
|
pJob = (SMPTJobInfo*)taosHashAcquire(mptCtx.pJobs, &qId, sizeof(qId));
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId);
|
uError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId);
|
||||||
return TSDB_CODE_QRY_JOB_NOT_EXIST;
|
return TSDB_CODE_QRY_JOB_NOT_EXIST;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -342,16 +474,15 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, void** ppSession
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
QW_ERR_JRET(taosMemPoolInitSession(mptCtx.memPoolHandle, ppSession, pJob->memInfo));
|
pJobCtx->pJob = pJob;
|
||||||
|
pJob->pCtx = pJobCtx;
|
||||||
|
|
||||||
|
assert(0 == taosMemPoolInitSession(mptCtx.memPoolHandle, ppSession, pJob->memInfo));
|
||||||
|
|
||||||
char id[sizeof(tId) + sizeof(eId)] = {0};
|
char id[sizeof(tId) + sizeof(eId)] = {0};
|
||||||
QW_SET_TEID(id, tId, eId);
|
MPT_SET_TEID(id, tId, eId);
|
||||||
|
|
||||||
code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES);
|
assert(0 == taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
qError("fail to put session into query session hash, code: 0x%x", code);
|
|
||||||
QW_ERR_JRET(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
@ -363,12 +494,12 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void mptInitPool(int64_t jobQuota, bool autoMaxSize, int64_t maxSize) {
|
void mptInitPool(void) {
|
||||||
SMemPoolCfg cfg = {0};
|
SMemPoolCfg cfg = {0};
|
||||||
|
|
||||||
cfg.autoMaxSize = autoMaxSize;
|
cfg.autoMaxSize = mptCtx.param.jobQuota;
|
||||||
if (!autoMaxSize) {
|
if (!mptCtx.param.autoPoolSize) {
|
||||||
cfg.maxSize = maxSize;
|
cfg.maxSize = mptCtx.param.poolSize;
|
||||||
} else {
|
} else {
|
||||||
int64_t memSize = 0;
|
int64_t memSize = 0;
|
||||||
ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize));
|
ASSERT_TRUE(0 == taosGetSysAvailMemory(&memSize));
|
||||||
|
@ -376,7 +507,7 @@ void mptInitPool(int64_t jobQuota, bool autoMaxSize, int64_t maxSize) {
|
||||||
}
|
}
|
||||||
cfg.threadNum = 10; //TODO
|
cfg.threadNum = 10; //TODO
|
||||||
cfg.evicPolicy = E_EVICT_AUTO; //TODO
|
cfg.evicPolicy = E_EVICT_AUTO; //TODO
|
||||||
cfg.jobQuota = jobQuota;
|
cfg.jobQuota = mptCtx.param.jobQuota;
|
||||||
cfg.cb.retireJobsFp = mptRetireJobsCb;
|
cfg.cb.retireJobsFp = mptRetireJobsCb;
|
||||||
cfg.cb.retireJobFp = mptRetireJobCb;
|
cfg.cb.retireJobFp = mptRetireJobCb;
|
||||||
cfg.cb.cfgUpdateFp = mptCheckUpateCfgCb;
|
cfg.cb.cfgUpdateFp = mptCheckUpateCfgCb;
|
||||||
|
@ -384,22 +515,77 @@ void mptInitPool(int64_t jobQuota, bool autoMaxSize, int64_t maxSize) {
|
||||||
ASSERT_TRUE(0 == taosMemPoolOpen("SingleThreadTest", &cfg, &mptCtx.memPoolHandle));
|
ASSERT_TRUE(0 == taosMemPoolOpen("SingleThreadTest", &cfg, &mptCtx.memPoolHandle));
|
||||||
}
|
}
|
||||||
|
|
||||||
void mptMemorySimulate(SMPTestTaskCtx* pCtx) {
|
void mptSimulateTask(SMPTestJobCtx* pJob, SMPTestTaskCtx* pCtx, bool finishTask) {
|
||||||
|
int32_t actTimes = 0;
|
||||||
|
if (!finishTask) {
|
||||||
|
actTimes = taosRand() % 100 * ()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void mptSimulateOutTask(SMPTestTaskCtx* pCtx, bool finishTask) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void mptTaskRun(int32_t idx, uint64_t qId, uint64_t tId, int32_t eId) {
|
|
||||||
ASSERT_TRUE(0 == mptInitSession(qId, tId, eId, &mptCtx.pSessions[idx]));
|
|
||||||
|
|
||||||
mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, mptCtx.pSessions[idx]);
|
void mptTaskRun(SMPTestJobCtx* pJob, SMPTestTaskCtx* pCtx, int32_t idx, bool finishTask) {
|
||||||
mptMemorySimulate(&mptCtx.taskCtxs[idx]);
|
taosRLockLatch(&pCtx->taskExecLock);
|
||||||
|
|
||||||
|
mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pJob->pSessions[idx]);
|
||||||
|
mptSimulateTask(pJob, pCtx, finishTask);
|
||||||
mptDisableMemoryPoolUsage();
|
mptDisableMemoryPoolUsage();
|
||||||
|
|
||||||
|
mptSimulateOutTask(pCtx, finishTask);
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pCtx->taskExecLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mptInitTask(int32_t idx, uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJob) {
|
||||||
|
ASSERT_TRUE(0 == mptInitSession(qId, tId, eId, pJob, &pJob->pSessions[idx]));
|
||||||
|
}
|
||||||
|
|
||||||
|
void mptInitJob(int32_t idx) {
|
||||||
|
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[idx];
|
||||||
|
|
||||||
|
pJobCtx->jobIdx = idx;
|
||||||
|
pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1);
|
||||||
|
pJobCtx->taskNum = (taosRand() % MPT_MAX_SESSION_NUM) + 1;
|
||||||
|
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
|
||||||
|
mptInitTask(i, pJobCtx->jobId, i, 0, pJobCtx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void mptInitJobs() {
|
||||||
|
for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) {
|
||||||
|
mptInitJob(i);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void* mptThreadFunc(void* param) {
|
void* mptThreadFunc(void* param) {
|
||||||
int32_t* threadIdx = (int32_t*)param;
|
SMPTestThread* pThread = (SMPTestThread*)param;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) {
|
||||||
|
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
|
||||||
|
if (taosRTryLockLatch(&pJobCtx->jobExecLock)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mptCtx.param.randTask) {
|
||||||
|
int32_t taskIdx = taosRand() % pJobCtx->taskNum;
|
||||||
|
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, false);
|
||||||
|
taosRUnLockLatch(&pJobCtx->jobExecLock);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t m = 0; m < pJobCtx->taskNum; ++m) {
|
||||||
|
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pJobCtx->jobExecLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mptStartThreadTest(int32_t threadIdx) {
|
void mptStartThreadTest(int32_t threadIdx) {
|
||||||
|
@ -407,21 +593,29 @@ void mptStartThreadTest(int32_t threadIdx) {
|
||||||
TdThreadAttr thattr;
|
TdThreadAttr thattr;
|
||||||
ASSERT_EQ(0, taosThreadAttrInit(&thattr));
|
ASSERT_EQ(0, taosThreadAttrInit(&thattr));
|
||||||
ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE));
|
ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE));
|
||||||
ASSERT_EQ(0, taosThreadCreate(&(t1), &thattr, mptThreadFunc, &threadIdx));
|
ASSERT_EQ(0, taosThreadCreate(&(t1), &thattr, mptThreadFunc, &mptCtx.threadCtxs[threadIdx]));
|
||||||
ASSERT_EQ(0, taosThreadAttrDestroy(&thattr));
|
ASSERT_EQ(0, taosThreadAttrDestroy(&thattr));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void mptRunCase() {
|
void mptRunCase(SMPTestParam* param) {
|
||||||
|
memcpy(&mptCtx.param, param, sizeof(SMPTestParam));
|
||||||
|
|
||||||
|
mptInitPool();
|
||||||
|
|
||||||
|
mptInitJobs();
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) {
|
||||||
|
mptCtx.threadCtxs[i].
|
||||||
|
mptStartThreadTest(i);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
#if 1
|
#if 1
|
||||||
TEST(FuncTest, SingleThreadTest) {
|
TEST(FuncTest, SingleJobTest) {
|
||||||
SJoinTestParam param;
|
|
||||||
char* caseName = "FuncTest:SingleThreadTest";
|
char* caseName = "FuncTest:SingleThreadTest";
|
||||||
void* pSession = NULL;
|
void* pSession = NULL;
|
||||||
|
|
||||||
|
@ -431,6 +625,18 @@ TEST(FuncTest, SingleThreadTest) {
|
||||||
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
#if 1
|
||||||
|
TEST(FuncTest, MultiJobsTest) {
|
||||||
|
char* caseName = "FuncTest:SingleThreadTest";
|
||||||
|
void* pSession = NULL;
|
||||||
|
|
||||||
|
mptInitPool(0, false, 5*1048576UL);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue