diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 35b5c1fd2b..083e0afeef 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -59,7 +59,9 @@ typedef struct SMPMemoryStat { SMPStatItem memCalloc; SMPStatItemExt memRealloc; SMPStatItem strdup; + SMPStatItem strndup; SMPStatItem memFree; + SMPStatItem memTrim; SMPStatItem chunkMalloc; SMPStatItem chunkRecycle; @@ -105,9 +107,10 @@ void *taosMemPoolMalloc(void* poolHandle, void* session, int64_t size, char* f void *taosMemPoolCalloc(void* poolHandle, void* session, int64_t num, int64_t size, char* fileName, int32_t lineNo); void *taosMemPoolRealloc(void* poolHandle, void* session, void *ptr, int64_t size, char* fileName, int32_t lineNo); char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo); +char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo); void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo); int64_t taosMemPoolGetMemorySize(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo); -void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo); +int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed); void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignment, int64_t size, char* fileName, int32_t lineNo); void taosMemPoolClose(void* poolHandle); void taosMemPoolModDestroy(void); @@ -141,9 +144,10 @@ extern threadlocal void* threadPoolSession; #define taosMemoryCalloc(_num, _size) ((NULL != threadPoolHandle) ? (taosMemPoolCalloc(threadPoolHandle, threadPoolSession, _num, _size, (char*)__FILE__, __LINE__)) : (taosMemCalloc(_num, _size))) #define taosMemoryRealloc(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolRealloc(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosMemRealloc(_ptr, _size))) #define taosStrdup(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolStrdup(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosStrdupi(_ptr))) +#define taosStrndup(_ptr, _size) ((NULL != threadPoolHandle) ? (taosMemPoolStrndup(threadPoolHandle, threadPoolSession, _ptr, _size, (char*)__FILE__, __LINE__)) : (taosStrndupi(_ptr, _size))) #define taosMemoryFree(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolFree(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemFree(_ptr))) #define taosMemorySize(_ptr) ((NULL != threadPoolHandle) ? (taosMemPoolGetMemorySize(threadPoolHandle, threadPoolSession, _ptr, (char*)__FILE__, __LINE__)) : (taosMemSize(_ptr))) -#define taosMemoryTrim(_size) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__)) : (taosMemTrim(_size))) +#define taosMemoryTrim(_size, _trimed) ((NULL != threadPoolHandle) ? (taosMemPoolTrim(threadPoolHandle, threadPoolSession, _size, (char*)__FILE__, __LINE__, _trimed)) : (taosMemTrim(_size))) #define taosMemoryMallocAlign(_alignment, _size) ((NULL != threadPoolHandle) ? (taosMemPoolMallocAlign(threadPoolHandle, threadPoolSession, _alignment, _size, (char*)__FILE__, __LINE__)) : (taosMemMallocAlign(_alignment, _size))) #else #define taosEnableMemoryPoolUsage(_pool, _session) @@ -157,7 +161,7 @@ extern threadlocal void* threadPoolSession; #define taosStrdup(_ptr) taosStrdupi(_ptr) #define taosMemoryFree(_ptr) taosMemFree(_ptr) #define taosMemorySize(_ptr) taosMemSize(_ptr) -#define taosMemoryTrim(_size) taosMemTrim(_size) +#define taosMemoryTrim(_size, _trimed) taosMemTrim(_size) #define taosMemoryMallocAlign(_alignment, _size) taosMemMallocAlign(_alignment, _size) #endif diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 281088060f..b76ca14265 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -42,6 +42,7 @@ void *taosMemMalloc(int64_t size); void *taosMemCalloc(int64_t num, int64_t size); void *taosMemRealloc(void *ptr, int64_t size); char *taosStrdupi(const char *ptr); +char *taosStrndupi(const char *ptr, int64_t size); void taosMemFree(void *ptr); int64_t taosMemSize(void *ptr); void taosPrintBackTrace(); diff --git a/include/os/osString.h b/include/os/osString.h index 80755de031..7ce8495701 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -64,6 +64,7 @@ typedef enum { M2C = 0, C2M } ConvType; #define TAOS_STRCAT(_dst, _src) ((void)strcat(_dst, _src)) char *tstrdup(const char *src); +char *tstrndup(const char *str, int64_t size); int32_t taosUcs4len(TdUcs4 *ucs4); int64_t taosStr2int64(const char *str); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index a3e1a64012..5c1c8d3b12 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -172,7 +172,7 @@ static void *dmMonitorThreadFp(void *param) { trimCount = (trimCount + 1) % TRIM_FREQ; if (trimCount == 0) { - taosMemoryTrim(0); + taosMemoryTrim(0, NULL); } } } diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index 4a7befb9c0..f1220c4aa3 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -390,6 +390,38 @@ char *taosStrdupi(const char *ptr) { #endif } +char *taosStrndupi(const char *ptr, int64_t size) { +#ifdef USE_TD_MEMORY + if (ptr == NULL) return NULL; + + TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); + ASSERT(pTdMemoryInfo->symbol == TD_MEMORY_SYMBOL); + if (pTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { + return NULL; + } + void *tmp = tstrdup(pTdMemoryInfo); + if (tmp == NULL) return NULL; + + memcpy(tmp, pTdMemoryInfo, sizeof(TdMemoryInfo)); + taosBackTrace(((TdMemoryInfoPtr)tmp)->stackTrace, TD_MEMORY_STACK_TRACE_DEPTH); + + return (char *)tmp + sizeof(TdMemoryInfo); +#else +#ifdef BUILD_WITH_RAND_ERR + if (tsEnableRandErr) { + uint32_t r = taosRand() % 10001; + if ((r + 1) <= tsRandErrChance) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } +#endif + + return tstrndup(ptr, size); +#endif +} + + void taosMemFree(void *ptr) { if (NULL == ptr) return; #ifdef USE_TD_MEMORY diff --git a/source/os/src/osString.c b/source/os/src/osString.c index b0a3615ee5..c17fa250f5 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -84,6 +84,20 @@ char *stpncpy(char *dest, const char *src, int n) { } #endif +char *tstrndup(const char *str, int64_t size) { +#ifdef WINDOWS + return strndup(str, size); +#else + char* p = strndup(str, size); + if (str != NULL && NULL == p) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } + return p; + +#endif +} + + int64_t taosStr2int64(const char *str) { char *endptr = NULL; return strtoll(str, &endptr, 10); diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 166181d9be..aa0b25802b 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -80,6 +80,9 @@ typedef enum EMPStatLogItem { E_MP_STAT_LOG_MEM_REALLOC, E_MP_STAT_LOG_MEM_FREE, E_MP_STAT_LOG_MEM_STRDUP, + E_MP_STAT_LOG_MEM_STRNDUP, + E_MP_STAT_LOG_MEM_TRIM, + E_MP_STAT_LOG_CHUNK_MALLOC, E_MP_STAT_LOG_CHUNK_RECYCLE, E_MP_STAT_LOG_CHUNK_REUSE, @@ -284,6 +287,7 @@ typedef int32_t (*mpReallocFunc)(SMemPool*, SMPSession*, void **, int64_t, int64 typedef int32_t (*mpInitSessionFunc)(SMemPool*, SMPSession*); typedef int32_t (*mpInitFunc)(SMemPool*, char*, SMemPoolCfg*); typedef int32_t (*mpUpdateCfgFunc)(SMemPool*); +typedef int32_t (*mpTrimFunc)(SMemPool*, SMPSession*, int32_t, bool*); typedef struct SMPStrategyFp { mpInitFunc initFp; @@ -293,6 +297,7 @@ typedef struct SMPStrategyFp { mpReallocFunc reallocFp; mpInitSessionFunc initSessionFp; mpUpdateCfgFunc updateCfgFp; + mpTrimFunc trimFp; } SMPStrategyFp; #define MP_GET_FLAG(st, f) ((st) & (f)) diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 760eae3494..0fd8e69ce6 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -25,8 +25,8 @@ threadlocal void* threadPoolSession = NULL; SMemPoolMgmt gMPMgmt = {0}; SMPStrategyFp gMPFps[] = { {NULL}, - {NULL, mpDirectAlloc, mpDirectFree, mpDirectGetMemSize, mpDirectRealloc, NULL, NULL}, - {mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg} + {NULL, mpDirectAlloc, mpDirectFree, mpDirectGetMemSize, mpDirectRealloc, NULL, NULL, NULL}, + {mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL} }; @@ -362,6 +362,17 @@ int32_t mpRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t si MP_RET((*gMPFps[gMPMgmt.strategy].reallocFp)(pPool, pSession, pPtr, size, origSize)); } +int32_t mpTrim(SMemPool* pPool, SMPSession* pSession, int32_t size, bool* trimed) { + int32_t code = TSDB_CODE_SUCCESS; + + if (gMPFps[gMPMgmt.strategy].trimFp) { + MP_RET((*gMPFps[gMPMgmt.strategy].trimFp)(pPool, pSession, size, trimed)); + } + + return code; +} + + void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailName, int64_t maxAllocSize) { if (!MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_PRINT_STAT)) { return; @@ -378,7 +389,9 @@ void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailN uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->times.memCalloc)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Realloc", pDetail->times.memRealloc)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->times.strdup)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->times.strndup)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->times.memFree)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->times.memTrim)); break; case E_MP_STRATEGY_CHUNK: uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->times.chunkMalloc)); @@ -397,7 +410,9 @@ void mpPrintStatDetail(SMPCtrlInfo* pCtrl, SMPStatDetail* pDetail, char* detailN uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Calloc", pDetail->bytes.memCalloc)); uInfo(MP_STAT_ORIG_FORMAT, MP_STAT_ORIG_VALUE("Realloc", pDetail->bytes.memRealloc)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strdup", pDetail->bytes.strdup)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Strndup", pDetail->bytes.strndup)); uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Free", pDetail->bytes.memFree)); + uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("Trim", pDetail->bytes.memTrim)); break; case E_MP_STRATEGY_CHUNK: uInfo(MP_STAT_FORMAT, MP_STAT_VALUE("chunkMalloc", pDetail->bytes.chunkMalloc)); @@ -511,6 +526,35 @@ void mpLogStatDetail(SMPStatDetail* pDetail, EMPStatLogItem item, SMPStatInput* } break; } + case E_MP_STAT_LOG_MEM_STRNDUP: { + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { + atomic_add_fetch_64(&pDetail->times.strndup.exec, 1); + atomic_add_fetch_64(&pDetail->bytes.strndup.exec, pInput->size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { + atomic_add_fetch_64(&pDetail->times.strndup.succ, 1); + atomic_add_fetch_64(&pDetail->bytes.strndup.succ, pInput->size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { + atomic_add_fetch_64(&pDetail->times.strndup.fail, 1); + atomic_add_fetch_64(&pDetail->bytes.strndup.fail, pInput->size); + } + break; + } + case E_MP_STAT_LOG_MEM_TRIM: { + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_EXEC)) { + atomic_add_fetch_64(&pDetail->times.memTrim.exec, 1); + atomic_add_fetch_64(&pDetail->bytes.memTrim.exec, pInput->origSize); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_SUCC)) { + atomic_add_fetch_64(&pDetail->times.memTrim.succ, 1); + atomic_add_fetch_64(&pDetail->bytes.memTrim.succ, pInput->size); + } + if (MP_GET_FLAG(pInput->procFlags, MP_STAT_PROC_FLAG_RES_FAIL)) { + atomic_add_fetch_64(&pDetail->times.memTrim.fail, 1); + } + break; + } case E_MP_STAT_LOG_CHUNK_MALLOC: case E_MP_STAT_LOG_CHUNK_RECYCLE: case E_MP_STAT_LOG_CHUNK_REUSE: @@ -529,7 +573,9 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt case E_MP_STAT_LOG_MEM_CALLOC: case E_MP_STAT_LOG_MEM_REALLOC: case E_MP_STAT_LOG_MEM_FREE: - case E_MP_STAT_LOG_MEM_STRDUP: { + case E_MP_STAT_LOG_MEM_STRDUP: + case E_MP_STAT_LOG_MEM_STRNDUP: + case E_MP_STAT_LOG_MEM_TRIM: { if (MP_GET_FLAG(pSession->ctrlInfo.statFlags, MP_STAT_FLAG_LOG_ALL_MEM_STAT)) { mpLogStatDetail(&pSession->stat.statDetail, item, pInput); } @@ -829,7 +875,7 @@ _return: return ptr; } -char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { +char *taosMemPoolStrdup(void* poolHandle, void* session, const char *ptr, char* fileName, int32_t lineNo) { int32_t code = TSDB_CODE_SUCCESS; void *res = NULL; @@ -858,6 +904,37 @@ _return: return res; } +char *taosMemPoolStrndup(void* poolHandle, void* session, const char *ptr, int64_t size, char* fileName, int32_t lineNo) { + int32_t code = TSDB_CODE_SUCCESS; + void *res = NULL; + + if (NULL == poolHandle || NULL == session || NULL == fileName || NULL == ptr || size < 0) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, ptr:%p, size:%" PRId64, + __FUNCTION__, poolHandle, session, fileName, ptr, size); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + int64_t origSize = strlen(ptr); + size = TMIN(size, origSize) + 1; + SMPStatInput input = {.size = size, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; + + terrno = mpMalloc(pPool, pSession, size, 0, &res); + if (NULL != res) { + TAOS_MEMCPY(res, ptr, size - 1); + *((char*)res + size - 1) = 0; + } + + MP_SET_FLAG(input.procFlags, (res ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_STRNDUP, &input); + +_return: + + return res; +} + + void taosMemPoolFree(void* poolHandle, void* session, void *ptr, char* fileName, int32_t lineNo) { int32_t code = TSDB_CODE_SUCCESS; if (NULL == poolHandle || NULL == session || NULL == fileName) { @@ -937,8 +1014,29 @@ void taosMemPoolModDestroy(void) { } -void taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo) { +int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fileName, int32_t lineNo, bool* trimed) { + int32_t code = TSDB_CODE_SUCCESS; + + if (NULL == poolHandle || NULL == session || NULL == fileName || size < 0) { + uError("%s invalid input param, handle:%p, session:%p, fileName:%p, size:%d", + __FUNCTION__, poolHandle, session, fileName, size); + MP_ERR_JRET(TSDB_CODE_INVALID_MEM_POOL_PARAM); + } + SMemPool* pPool = (SMemPool*)poolHandle; + SMPSession* pSession = (SMPSession*)session; + SMPStatInput input = {.origSize = 1, .size = 0, .file = fileName, .line = lineNo, .procFlags = MP_STAT_PROC_FLAG_EXEC}; + + code = mpTrim(pPool, pSession, size, trimed); + + input.size = (trimed) ? 1 : 0; + + MP_SET_FLAG(input.procFlags, ((0 == code) ? MP_STAT_PROC_FLAG_RES_SUCC : MP_STAT_PROC_FLAG_RES_FAIL)); + mpLogStat(pPool, pSession, E_MP_STAT_LOG_MEM_TRIM, &input); + +_return: + + return code; } int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 12d499802f..845c01728a 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -46,6 +46,7 @@ namespace { #define MPT_MAX_SESSION_NUM 256 #define MPT_MAX_JOB_NUM 200 #define MPT_MAX_THREAD_NUM 100 +#define MPT_MAX_JOB_LOOP_TIMES 1000 #define MPT_DEFAULT_RESERVE_MEM_PERCENT 20 #define MPT_MIN_RESERVE_MEM_SIZE (512 * 1048576UL) @@ -100,8 +101,12 @@ typedef struct SMPTJobInfo { typedef struct { - bool printTestInfo; - bool printInputRow; + int32_t taskMaxActTimes; + int32_t caseLoopTimes; + int64_t maxSingleAllocSize; + char* pSrcString; + bool printTestInfo; + bool printInputRow; } SMPTestCtrl; typedef struct { @@ -110,8 +115,8 @@ typedef struct { } SMPTestMemInfo; typedef struct { - int8_t taskExecLock; - bool taskFinished; + SRWLatch taskExecLock; + bool taskFinished; int64_t poolMaxUsedSize; int64_t poolTotalUsedSize; @@ -119,12 +124,12 @@ typedef struct { SMPStatDetail stat; int32_t memIdx; - SMPTestMemInfo* pMemList[MPT_MAX_MEM_ACT_TIMES]; + SMPTestMemInfo pMemList[MPT_MAX_MEM_ACT_TIMES]; int64_t npSize; int32_t npMemIdx; - SMPTestMemInfo* npMemList[MPT_MAX_MEM_ACT_TIMES]; + SMPTestMemInfo npMemList[MPT_MAX_MEM_ACT_TIMES]; bool taskFreed; } SMPTestTaskCtx; @@ -133,12 +138,12 @@ typedef struct { SRWLatch jobExecLock; int32_t jobIdx; - uint64_t jobId; + int64_t jobId; void* pSessions[MPT_MAX_SESSION_NUM]; int32_t taskNum; SMPTestTaskCtx taskCtxs[MPT_MAX_SESSION_NUM]; - int32_t taskExecIdx; + int32_t taskRunningNum; SMPTJobInfo* pJob; int32_t jobStatus; } SMPTestJobCtx; @@ -147,18 +152,18 @@ typedef struct { int64_t jobQuota; bool autoPoolSize; int32_t poolSize; - int32_t maxExecJobNum; int32_t threadNum; int32_t randTask; } SMPTestParam; typedef struct { - bool allJobs; - bool autoJob; + TdThread threadFp; + bool allJobs; + bool autoJob; } SMPTestThread; typedef struct SMPTestCtx { - uint64_t qId; + int64_t qId; SHashObj* pJobs; BoundedQueue* pJobQueue; void* memPoolHandle; @@ -168,6 +173,7 @@ typedef struct SMPTestCtx { } SMPTestCtx; SMPTestCtx mptCtx = {0}; +SMPTestCtrl mptCtrl = {0}; #if 0 void joinTestReplaceRetrieveFp() { @@ -227,6 +233,13 @@ void mptInit() { mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, NULL, NULL); ASSERT_TRUE(NULL != mptCtx.pJobQueue); + mptCtrl.caseLoopTimes = 100; + mptCtrl.taskMaxActTimes = 1000; + mptCtrl.maxSingleAllocSize = 104857600; + mptCtrl.pSrcString = (char*)taosMemoryMalloc(mptCtrl.maxSingleAllocSize); + ASSERT_TRUE(NULL != mptCtrl.pSrcString); + memset(mptCtrl.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1); + mptCtrl.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0; } void mptDestroyTaskCtx(SMPTestTaskCtx* pTask) { @@ -238,14 +251,106 @@ void mptDestroyTaskCtx(SMPTestTaskCtx* pTask) { } } + +int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) { + pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == pJob->pSessions) { + uError("fail to init session hash, code: 0x%x", terrno); + return terrno; + } + + int32_t code = taosMemPoolCallocJob(qId, (void**)&pJob->memInfo); + if (TSDB_CODE_SUCCESS != code) { + taosHashCleanup(pJob->pSessions); + pJob->pSessions = NULL; + return code; + } + + return code; +} + + + void mptDestroyJobInfo(SMPTJobInfo* pJob) { taosMemFree(pJob->memInfo); taosHashCleanup(pJob->pSessions); } -void mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { - taosWLockLatch(&pJobCtx->jobExecLock); +int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJobCtx, void** ppSession) { + int32_t code = TSDB_CODE_SUCCESS; + SMPTJobInfo* pJob = NULL; + + while (true) { + pJob = (SMPTJobInfo*)taosHashAcquire(mptCtx.pJobs, &qId, sizeof(qId)); + if (NULL == pJob) { + SMPTJobInfo jobInfo = {0}; + code = mptInitJobInfo(qId, &jobInfo); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + code = taosHashPut(mptCtx.pJobs, &qId, sizeof(qId), &jobInfo, sizeof(jobInfo)); + if (TSDB_CODE_SUCCESS != code) { + mptDestroyJobInfo(&jobInfo); + if (TSDB_CODE_DUP_KEY == code) { + code = TSDB_CODE_SUCCESS; + continue; + } + + return code; + } + + pJob = (SMPTJobInfo*)taosHashAcquire(mptCtx.pJobs, &qId, sizeof(qId)); + if (NULL == pJob) { + uError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId); + return TSDB_CODE_QRY_JOB_NOT_EXIST; + } + } + + break; + } + + pJobCtx->pJob = pJob; + pJob->pCtx = pJobCtx; + + assert(0 == taosMemPoolInitSession(mptCtx.memPoolHandle, ppSession, pJob->memInfo)); + + char id[sizeof(tId) + sizeof(eId)] = {0}; + MPT_SET_TEID(id, tId, eId); + + assert(0 == taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES)); + +_return: + + if (NULL != pJob) { + taosHashRelease(mptCtx.pJobs, pJob); + } + + return code; +} + + + +void mptInitTask(int32_t idx, uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJob) { + ASSERT_TRUE(0 == mptInitSession(qId, tId, eId, pJob, &pJob->pSessions[idx])); +} + +void mptInitJob(int32_t idx) { + SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[idx]; + + pJobCtx->jobIdx = idx; + pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1); + pJobCtx->taskNum = (taosRand() % MPT_MAX_SESSION_NUM) + 1; + for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { + mptInitTask(i, pJobCtx->jobId, i, 0, pJobCtx); + } +} + +int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { + if (taosWTryLockLatch(&pJobCtx->jobExecLock)) { + return -1; + } mptDestroyJobInfo(pJobCtx->pJob); (void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId)); @@ -259,22 +364,30 @@ void mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { } taosWUnLockLatch(&pJobCtx->jobExecLock); + + return 0; } void mptCheckCompareJobInfo(SMPTestJobCtx* pJobCtx) { } -void mptResetJob(SMPTestJobCtx* pJobCtx) { - mptDestroyJob(pJobCtx, true); +int32_t mptResetJob(SMPTestJobCtx* pJobCtx) { + if (atomic_load_8(&pJobCtx->pJob->retired)) { + if (0 == atomic_load_32(&pJobCtx->taskRunningNum)) { + return mptDestroyJob(pJobCtx, true); + } else { + return -1; + } + } + + return 0; } void mptRetireJob(SMPTJobInfo* pJob) { SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx; mptCheckCompareJobInfo(pCtx); - - mptResetJob(pCtx); } int32_t mptGetMemPoolMaxMemSize(int64_t totalSize, int64_t* maxSize) { @@ -318,7 +431,7 @@ int32_t mptGetQueryMemPoolMaxSize(int64_t* pMaxSize, bool* autoMaxSize) { void mptCheckUpateCfgCb(void* pHandle, void* cfg) { SMemPoolCfg* pCfg = (SMemPoolCfg*)cfg; - int64_t newJobQuota = mptCtx.singleQueryMaxSize * 1048576UL; + int64_t newJobQuota = mptCtx.param.jobQuota * 1048576UL; if (pCfg->jobQuota != newJobQuota) { atomic_store_64(&pCfg->jobQuota, newJobQuota); } @@ -422,82 +535,10 @@ void mptRetireJobCb(SMemPoolJob* mpJob, int32_t errCode) { } } -int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) { - pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (NULL == pJob->pSessions) { - uError("fail to init session hash, code: 0x%x", terrno); - return terrno; - } - - int32_t code = taosMemPoolCallocJob(qId, (void**)&pJob->memInfo); - if (TSDB_CODE_SUCCESS != code) { - taosHashCleanup(pJob->pSessions); - pJob->pSessions = NULL; - return code; - } - - return code; -} - - -int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJobCtx, void** ppSession) { - int32_t code = TSDB_CODE_SUCCESS; - SMPTJobInfo* pJob = NULL; - - while (true) { - pJob = (SMPTJobInfo*)taosHashAcquire(mptCtx.pJobs, &qId, sizeof(qId)); - if (NULL == pJob) { - SMPTJobInfo jobInfo = {0}; - code = mptInitJobInfo(qId, &jobInfo); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - code = taosHashPut(mptCtx.pJobs, &qId, sizeof(qId), &jobInfo, sizeof(jobInfo)); - if (TSDB_CODE_SUCCESS != code) { - mptDestroyJobInfo(&jobInfo); - if (TSDB_CODE_DUP_KEY == code) { - code = TSDB_CODE_SUCCESS; - continue; - } - - return code; - } - - pJob = (SMPTJobInfo*)taosHashAcquire(mptCtx.pJobs, &qId, sizeof(qId)); - if (NULL == pJob) { - uError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId); - return TSDB_CODE_QRY_JOB_NOT_EXIST; - } - } - - break; - } - - pJobCtx->pJob = pJob; - pJob->pCtx = pJobCtx; - - assert(0 == taosMemPoolInitSession(mptCtx.memPoolHandle, ppSession, pJob->memInfo)); - - char id[sizeof(tId) + sizeof(eId)] = {0}; - MPT_SET_TEID(id, tId, eId); - - assert(0 == taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES)); - -_return: - - if (NULL != pJob) { - taosHashRelease(mptCtx.pJobs, pJob); - } - - return code; -} - - void mptInitPool(void) { SMemPoolCfg cfg = {0}; - cfg.autoMaxSize = mptCtx.param.jobQuota; + cfg.autoMaxSize = mptCtx.param.autoPoolSize; if (!mptCtx.param.autoPoolSize) { cfg.maxSize = mptCtx.param.poolSize; } else { @@ -515,46 +556,196 @@ void mptInitPool(void) { ASSERT_TRUE(0 == taosMemPoolOpen("SingleThreadTest", &cfg, &mptCtx.memPoolHandle)); } -void mptSimulateTask(SMPTestJobCtx* pJob, SMPTestTaskCtx* pCtx, bool finishTask) { - int32_t actTimes = 0; - if (!finishTask) { - actTimes = taosRand() % 100 * () +void mptSimulateAction(SMPTestTaskCtx* pTask) { + int32_t actId = 0; + bool actDone = false; + int32_t size = taosRand() % mptCtrl.maxSingleAllocSize; + + while (!actDone) { + actId = taosRand() % 9; + switch (actId) { + case 0: { // malloc + if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) { + break; + } + + pTask->pMemList[pTask->memIdx].p = taosMemoryMalloc(size); + if (NULL == pTask->pMemList[pTask->memIdx].p) { + return; + } + + pTask->pMemList[pTask->memIdx].size = size; + pTask->memIdx++; + actDone = true; + break; + } + case 1: { // calloc + if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) { + break; + } + + pTask->pMemList[pTask->memIdx].p = taosMemoryCalloc(1, size); + if (NULL == pTask->pMemList[pTask->memIdx].p) { + return; + } + + pTask->pMemList[pTask->memIdx].size = size; + pTask->memIdx++; + actDone = true; + break; + } + case 2:{ // new realloc + if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) { + break; + } + + pTask->pMemList[pTask->memIdx].p = taosMemoryRealloc(NULL, size); + if (NULL == pTask->pMemList[pTask->memIdx].p) { + return; + } + + pTask->pMemList[pTask->memIdx].size = size; + pTask->memIdx++; + actDone = true; + break; + } + case 3:{ // real realloc + if (pTask->memIdx <= 0) { + break; + } + + assert(pTask->pMemList[pTask->memIdx - 1].p); + pTask->pMemList[pTask->memIdx - 1].p = taosMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, size); + if (NULL == pTask->pMemList[pTask->memIdx - 1].p) { + return; + } + + pTask->pMemList[pTask->memIdx - 1].size = size; + actDone = true; + break; + } + case 4:{ // realloc free + if (pTask->memIdx <= 0) { + break; + } + + assert(pTask->pMemList[pTask->memIdx - 1].p); + pTask->pMemList[pTask->memIdx - 1].p = taosMemoryRealloc(pTask->pMemList[pTask->memIdx - 1].p, 0); + if (NULL != pTask->pMemList[pTask->memIdx - 1].p) { + taosMemoryFreeClear(pTask->pMemList[pTask->memIdx - 1].p); + } + + pTask->memIdx--; + actDone = true; + break; + } + case 5:{ // strdup + if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) { + break; + } + + mptCtrl.pSrcString[size] = 0; + pTask->pMemList[pTask->memIdx].p = taosStrdup(mptCtrl.pSrcString); + mptCtrl.pSrcString[size] = 'W'; + if (NULL == pTask->pMemList[pTask->memIdx].p) { + return; + } + + pTask->pMemList[pTask->memIdx].size = size + 1; + pTask->memIdx++; + actDone = true; + break; + } + case 6:{ // strndup + if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) { + break; + } + + pTask->pMemList[pTask->memIdx].p = taosStrndup(mptCtrl.pSrcString, size); + if (NULL == pTask->pMemList[pTask->memIdx].p) { + return; + } + + pTask->pMemList[pTask->memIdx].size = size + 1; + pTask->memIdx++; + actDone = true; + break; + } + case 7:{ // free + if (pTask->memIdx <= 0) { + break; + } + + assert(pTask->pMemList[pTask->memIdx - 1].p); + taosMemoryFreeClear(pTask->pMemList[pTask->memIdx - 1].p); + + pTask->memIdx--; + actDone = true; + break; + } + case 8:{ // trim + taosMemoryTrim(0, NULL); + actDone = true; + break; + } + default: + assert(0); + break; + } + } +} + +void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { + int32_t actTimes = taosRand() % mptCtrl.taskMaxActTimes; + for (int32_t i = 0; i < actTimes; ++i) { + if (atomic_load_8(&pJobCtx->pJob->retired)) { + return; + } + + mptSimulateAction(pTask); + } +} + +void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { + if (atomic_load_8(&pJobCtx->pJob->retired)) { + return; } + if (taosRand() % 10 > 0) { + return; + } + + if (pTask->npMemIdx >= MPT_MAX_MEM_ACT_TIMES) { + return; + } -} - -void mptSimulateOutTask(SMPTestTaskCtx* pCtx, bool finishTask) { - + pTask->npMemList[pTask->npMemIdx].p = taosMemoryMalloc(taosRand() % mptCtrl.maxSingleAllocSize); + pTask->npMemIdx++; } -void mptTaskRun(SMPTestJobCtx* pJob, SMPTestTaskCtx* pCtx, int32_t idx, bool finishTask) { - taosRLockLatch(&pCtx->taskExecLock); +void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx) { + if (atomic_load_8(&pJobCtx->pJob->retired)) { + return; + } - mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pJob->pSessions[idx]); - mptSimulateTask(pJob, pCtx, finishTask); + if (taosWTryLockLatch(&pCtx->taskExecLock)) { + return; + } + + atomic_add_fetch_32(&pJobCtx->taskRunningNum, 1); + + mptEnableMemoryPoolUsage(mptCtx.memPoolHandle, pJobCtx->pSessions[idx]); + mptSimulateTask(pJobCtx, pCtx); mptDisableMemoryPoolUsage(); - mptSimulateOutTask(pCtx, finishTask); + mptSimulateOutTask(pJobCtx, pCtx); - taosRUnLockLatch(&pCtx->taskExecLock); + taosWUnLockLatch(&pCtx->taskExecLock); + + atomic_sub_fetch_32(&pJobCtx->taskRunningNum, 1); } -void mptInitTask(int32_t idx, uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJob) { - ASSERT_TRUE(0 == mptInitSession(qId, tId, eId, pJob, &pJob->pSessions[idx])); -} - -void mptInitJob(int32_t idx) { - SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[idx]; - - pJobCtx->jobIdx = idx; - pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1); - pJobCtx->taskNum = (taosRand() % MPT_MAX_SESSION_NUM) + 1; - for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { - mptInitTask(i, pJobCtx->jobId, i, 0, pJobCtx); - } -} void mptInitJobs() { for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) { @@ -567,19 +758,31 @@ void* mptThreadFunc(void* param) { for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) { SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; - if (taosRTryLockLatch(&pJobCtx->jobExecLock)) { - continue; - } - - if (mptCtx.param.randTask) { - int32_t taskIdx = taosRand() % pJobCtx->taskNum; - mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, false); - taosRUnLockLatch(&pJobCtx->jobExecLock); + if (mptResetJob(pJobCtx)) { continue; } - for (int32_t m = 0; m < pJobCtx->taskNum; ++m) { - mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m, false); + if (taosRTryLockLatch(&pJobCtx->jobExecLock)) { + continue; + } + + int32_t jobExecTimes = taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1; + for (int32_t n = 0; n < jobExecTimes; ++n) { + if (atomic_load_8(&pJobCtx->pJob->retired)) { + break; + } + if (mptCtx.param.randTask) { + int32_t taskIdx = taosRand() % pJobCtx->taskNum; + mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx); + continue; + } + + for (int32_t m = 0; m < pJobCtx->taskNum; ++m) { + if (atomic_load_8(&pJobCtx->pJob->retired)) { + break; + } + mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m); + } } taosRUnLockLatch(&pJobCtx->jobExecLock); @@ -589,15 +792,20 @@ void* mptThreadFunc(void* param) { } void mptStartThreadTest(int32_t threadIdx) { - TdThread t1; TdThreadAttr thattr; ASSERT_EQ(0, taosThreadAttrInit(&thattr)); ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE)); - ASSERT_EQ(0, taosThreadCreate(&(t1), &thattr, mptThreadFunc, &mptCtx.threadCtxs[threadIdx])); + ASSERT_EQ(0, taosThreadCreate(&mptCtx.threadCtxs[threadIdx].threadFp, &thattr, mptThreadFunc, &mptCtx.threadCtxs[threadIdx])); ASSERT_EQ(0, taosThreadAttrDestroy(&thattr)); } +void mptDestroyJobs() { + for (int32_t i = 0; i < MPT_MAX_JOB_NUM; ++i) { + mptDestroyJob(&mptCtx.jobCtxs[i], false); + } +} + void mptRunCase(SMPTestParam* param) { memcpy(&mptCtx.param, param, sizeof(SMPTestParam)); @@ -606,9 +814,14 @@ void mptRunCase(SMPTestParam* param) { mptInitJobs(); for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) { - mptCtx.threadCtxs[i]. mptStartThreadTest(i); } + + for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) { + (void)taosThreadJoin(mptCtx.threadCtxs[i].threadFp, NULL); + } + + mptDestroyJobs(); } } // namespace @@ -617,23 +830,24 @@ void mptRunCase(SMPTestParam* param) { #if 1 TEST(FuncTest, SingleJobTest) { char* caseName = "FuncTest:SingleThreadTest"; - void* pSession = NULL; - - mptInitPool(0, false, 5*1048576UL); + SMPTestParam param = {0}; + param.autoPoolSize = true; + param.threadNum = 10; + for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { + mptRunCase(¶m); + } - } #endif -#if 1 +#if 0 TEST(FuncTest, MultiJobsTest) { char* caseName = "FuncTest:SingleThreadTest"; - void* pSession = NULL; - - mptInitPool(0, false, 5*1048576UL); + SMPTestParam param = {0}; - - + for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { + mptRunCase(¶m); + } } #endif