fix: rpc dead loop issue
This commit is contained in:
parent
a5bd91492d
commit
e2accc307c
|
@ -2,7 +2,7 @@
|
|||
# libuv
|
||||
ExternalProject_Add(libuv
|
||||
GIT_REPOSITORY https://github.com/libuv/libuv.git
|
||||
GIT_TAG v1.48.0
|
||||
GIT_TAG v1.49.0
|
||||
SOURCE_DIR "${TD_CONTRIB_DIR}/libuv"
|
||||
BINARY_DIR "${TD_CONTRIB_DIR}/libuv"
|
||||
CONFIGURE_COMMAND ""
|
||||
|
|
|
@ -34,6 +34,9 @@ typedef enum MemPoolEvictPolicy {
|
|||
typedef struct SMemPoolJob {
|
||||
uint64_t jobId;
|
||||
uint64_t clientId;
|
||||
|
||||
int32_t remainSession;
|
||||
|
||||
int64_t allocMemSize;
|
||||
int64_t maxAllocMemSize;
|
||||
} SMemPoolJob;
|
||||
|
@ -124,7 +127,7 @@ void taosMemPoolClose(void* poolHandle);
|
|||
void taosMemPoolModDestroy(void);
|
||||
void taosAutoMemoryFree(void *ptr);
|
||||
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, char *sessionId);
|
||||
void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* remainSessions);
|
||||
void taosMemPoolDestroySession(void* poolHandle, void* session);
|
||||
int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob);
|
||||
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg);
|
||||
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName);
|
||||
|
|
|
@ -529,7 +529,7 @@ void qwDbgSimulateSleep(void);
|
|||
void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
|
||||
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
|
||||
int32_t qwInitQueryPool(void);
|
||||
void qwDestroyJobInfo(SQWJobInfo* pJob);
|
||||
void qwDestroyJobInfo(void* job);
|
||||
void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
|
||||
void qwRetireJob(SQWJobInfo* pJob);
|
||||
void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session);
|
||||
|
|
|
@ -65,7 +65,10 @@ int32_t qwInitJobHash(void) {
|
|||
|
||||
if (NULL != atomic_val_compare_exchange_ptr(&gQueryMgmt.pJobInfo, NULL, pHash)) {
|
||||
taosHashCleanup(pHash);
|
||||
return code;
|
||||
}
|
||||
|
||||
taosHashSetFreeFp(gQueryMgmt.pJobInfo, qwDestroyJobInfo);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -75,19 +78,18 @@ int32_t qwInitJobHash(void) {
|
|||
void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) {
|
||||
char id[sizeof(tId) + sizeof(eId) + 1] = {0};
|
||||
QW_SET_TEID(id, tId, eId);
|
||||
int32_t remainSessions = 0;
|
||||
int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1);
|
||||
|
||||
(void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id));
|
||||
|
||||
taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions);
|
||||
taosMemPoolDestroySession(gMemPoolHandle, session);
|
||||
|
||||
if (0 == remainSessions) {
|
||||
QW_LOCK(QW_WRITE, &pJobInfo->lock);
|
||||
if (0 == taosHashGetSize(pJobInfo->pSessions)) {
|
||||
if (0 == taosHashGetSize(pJobInfo->pSessions) && 0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) {
|
||||
atomic_store_8(&pJobInfo->destroyed, 1);
|
||||
qwDestroyJobInfo(pJobInfo);
|
||||
QW_UNLOCK(QW_WRITE, &pJobInfo->lock);
|
||||
|
||||
|
||||
char id2[sizeof(qId) + sizeof(cId) + 1] = {0};
|
||||
QW_SET_QCID(id2, qId, cId);
|
||||
(void)taosHashRemove(gQueryMgmt.pJobInfo, id2, sizeof(id2));
|
||||
|
@ -137,10 +139,15 @@ int32_t qwRetrieveJobInfo(QW_FPARAMS_DEF, SQWJobInfo** ppJob) {
|
|||
}
|
||||
}
|
||||
|
||||
QW_LOCK(QW_READ, &pJob->lock);
|
||||
if (atomic_load_8(&pJob->destroyed)) {
|
||||
QW_UNLOCK(QW_READ, &pJob->lock);
|
||||
continue;
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&pJob->memInfo->remainSession, 1);
|
||||
QW_UNLOCK(QW_READ, &pJob->lock);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -172,19 +179,11 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
|
|||
QW_ERR_JRET(taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id));
|
||||
session.sessionMp = *ppSession;
|
||||
|
||||
QW_LOCK(QW_READ, &pJob->lock);
|
||||
if (atomic_load_8(&pJob->destroyed)) {
|
||||
QW_UNLOCK(QW_READ, &pJob->lock);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = taosHashPut(pJob->pSessions, id, sizeof(id), &session, sizeof(session));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
QW_UNLOCK(QW_READ, &pJob->lock);
|
||||
QW_TASK_ELOG("fail to put session into query session hash, code: 0x%x", code);
|
||||
QW_ERR_JRET(code);
|
||||
}
|
||||
QW_UNLOCK(QW_READ, &pJob->lock);
|
||||
|
||||
break;
|
||||
} while (true);
|
||||
|
@ -192,6 +191,10 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
|
|||
_return:
|
||||
|
||||
if (NULL != pJob) {
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
qwDestroySession(QW_FPARAMS(), pJob, *ppSession);
|
||||
}
|
||||
|
||||
taosHashRelease(gQueryMgmt.pJobInfo, pJob);
|
||||
}
|
||||
|
||||
|
|
|
@ -273,10 +273,6 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask
|
|||
void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); }
|
||||
|
||||
void qwFreeTaskHandle(SQWTaskCtx *ctx) {
|
||||
if (ctx->dynamicTask) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Note: free/kill may in RC
|
||||
qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle);
|
||||
if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) {
|
||||
|
@ -289,10 +285,6 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) {
|
|||
}
|
||||
|
||||
void qwFreeSinkHandle(SQWTaskCtx *ctx) {
|
||||
if (ctx->dynamicTask) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Note: free/kill may in RC
|
||||
void* osinkHandle = atomic_load_ptr(&ctx->sinkHandle);
|
||||
if (osinkHandle && osinkHandle == atomic_val_compare_exchange_ptr(&ctx->sinkHandle, osinkHandle, NULL)) {
|
||||
|
@ -590,6 +582,7 @@ void qwCloseRef(void) {
|
|||
gQwMgmt.qwRef = -1;
|
||||
|
||||
taosHashCleanup(gQueryMgmt.pJobInfo);
|
||||
gQueryMgmt.pJobInfo = NULL;
|
||||
}
|
||||
taosWUnLockLatch(&gQwMgmt.lock);
|
||||
}
|
||||
|
@ -734,11 +727,13 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
|
|||
}
|
||||
}
|
||||
|
||||
void qwDestroyJobInfo(SQWJobInfo* pJob) {
|
||||
if (NULL == pJob) {
|
||||
void qwDestroyJobInfo(void* job) {
|
||||
if (NULL == job) {
|
||||
return;
|
||||
}
|
||||
|
||||
SQWJobInfo* pJob = (SQWJobInfo*)job;
|
||||
|
||||
taosMemoryFreeClear(pJob->memInfo);
|
||||
taosHashCleanup(pJob->pSessions);
|
||||
pJob->pSessions = NULL;
|
||||
|
|
|
@ -89,7 +89,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
|||
|
||||
_return:
|
||||
|
||||
if (!ctx->explain || ctx->explainRsped) {
|
||||
if ((!ctx->dynamicTask) && (!ctx->explain || ctx->explainRsped)) {
|
||||
qwFreeTaskHandle(ctx);
|
||||
}
|
||||
|
||||
|
@ -490,7 +490,9 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32
|
|||
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
qwFreeSinkHandle(ctx);
|
||||
if (!ctx->dynamicTask) {
|
||||
qwFreeSinkHandle(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -876,7 +878,9 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
atomic_store_8((int8_t *)&ctx->queryContinue, 0);
|
||||
qwFreeSinkHandle(ctx);
|
||||
if (!ctx->dynamicTask) {
|
||||
qwFreeSinkHandle(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
|
@ -966,7 +970,9 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
qwFreeSinkHandle(ctx);
|
||||
if (!ctx->dynamicTask) {
|
||||
qwFreeSinkHandle(ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1593,7 +1599,9 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
|
|||
qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t *)&ctx->queryEnd, true);
|
||||
qwFreeSinkHandle(ctx);
|
||||
if (!ctx->dynamicTask) {
|
||||
qwFreeSinkHandle(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
|
|
|
@ -1278,6 +1278,7 @@ static void cliHandleException(SCliConn* conn) {
|
|||
if (conn->registered) {
|
||||
int8_t ref = transGetRefCount(conn);
|
||||
if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) {
|
||||
tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds);
|
||||
uv_close((uv_handle_t*)conn->stream, cliDestroy);
|
||||
}
|
||||
}
|
||||
|
@ -1586,6 +1587,8 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) {
|
|||
TAOS_CHECK_GOTO(terrno, &lino, _exception1);
|
||||
}
|
||||
|
||||
tTrace("%s conn %p fd %d openend", pInst->label, conn, fd);
|
||||
|
||||
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
|
||||
if (ret != 0) {
|
||||
tError("%s conn %p failed to set stream since %s", transLabel(pInst), conn, uv_err_name(ret));
|
||||
|
|
|
@ -75,8 +75,10 @@ extern "C" {
|
|||
#define MP_STAT_PROC_FLAG_RES_FAIL (1 << 3)
|
||||
|
||||
// CTRL FUNC FLAGS
|
||||
#define MP_CTRL_FLAG_PRINT_STAT (1 << 0)
|
||||
#define MP_CTRL_FLAG_CHECK_STAT (1 << 1)
|
||||
#define MP_CTRL_FLAG_PRINT_STAT (1 << 0)
|
||||
#define MP_CTRL_FLAG_CHECK_STAT (1 << 1)
|
||||
#define MP_CTRL_FLAG_LOCK_DBG (1 << 2)
|
||||
#define MP_CTRL_FLAG_LOG_MAXSIZE (1 << 3)
|
||||
|
||||
|
||||
typedef enum EMPStatLogItem {
|
||||
|
@ -199,7 +201,6 @@ typedef struct SMPStatInfo {
|
|||
|
||||
typedef struct SMPJob {
|
||||
SMemPoolJob job; // KEEP IT FIRST
|
||||
int32_t remainSession;
|
||||
SMPStatInfo stat;
|
||||
} SMPJob;
|
||||
|
||||
|
@ -281,7 +282,6 @@ typedef struct SMemPool {
|
|||
SMemPoolCfg cfg;
|
||||
//int64_t retireThreshold[3];
|
||||
int64_t retireUnit;
|
||||
SMPCtrlInfo ctrl;
|
||||
|
||||
int64_t maxAllocMemSize;
|
||||
int64_t allocMemSize;
|
||||
|
@ -306,6 +306,7 @@ typedef struct SMPMsgQueue {
|
|||
|
||||
typedef struct SMemPoolMgmt {
|
||||
EMPMemStrategy strategy;
|
||||
SMPCtrlInfo ctrl;
|
||||
SArray* poolList;
|
||||
SRWLatch poolLock;
|
||||
TdThread poolMgmtThread;
|
||||
|
@ -316,6 +317,9 @@ typedef struct SMemPoolMgmt {
|
|||
int32_t code;
|
||||
} SMemPoolMgmt;
|
||||
|
||||
extern SMemPoolMgmt gMPMgmt;
|
||||
|
||||
|
||||
typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t*, uint32_t, void**);
|
||||
typedef void (*mpFreeFunc)(SMemPool*, SMPSession*, void *, int64_t*);
|
||||
typedef int64_t (*mpGetSizeFunc)(SMemPool*, SMPSession*, void*);
|
||||
|
@ -376,17 +380,25 @@ enum {
|
|||
#define MP_TRY_LOCK(type, _lock, _res) \
|
||||
do { \
|
||||
if (MP_READ == (type)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try read lock"); \
|
||||
uDebug("MP TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try read lock"); \
|
||||
uDebug("MP TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
(_res) = taosRTryLockLatch(_lock); \
|
||||
uDebug("MP TRY RLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \
|
||||
ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) > 0, "invalid lock value after try read lock"); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
uDebug("MP TRY RLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \
|
||||
ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) > 0, "invalid lock value after try read lock"); \
|
||||
} \
|
||||
} else { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try write lock"); \
|
||||
uDebug("MP TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try write lock"); \
|
||||
uDebug("MP TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
(_res) = taosWTryLockLatch(_lock); \
|
||||
uDebug("MP TRY WLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \
|
||||
ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after try write lock"); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
uDebug("MP TRY WLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \
|
||||
ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after try write lock"); \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
@ -394,34 +406,50 @@ enum {
|
|||
#define MP_LOCK(type, _lock) \
|
||||
do { \
|
||||
if (MP_READ == (type)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \
|
||||
uDebug("MP RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \
|
||||
uDebug("MP RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
taosRLockLatch(_lock); \
|
||||
uDebug("MP RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
uDebug("MP RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \
|
||||
} \
|
||||
} else { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \
|
||||
uDebug("MP WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \
|
||||
uDebug("MP WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
taosWLockLatch(_lock); \
|
||||
uDebug("MP WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
uDebug("MP WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define MP_UNLOCK(type, _lock) \
|
||||
do { \
|
||||
if (MP_READ == (type)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
|
||||
uDebug("MP RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
|
||||
uDebug("MP RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
taosRUnLockLatch(_lock); \
|
||||
uDebug("MP RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
uDebug("MP RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
|
||||
} \
|
||||
} else { \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
|
||||
uDebug("MP WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
|
||||
uDebug("MP WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
taosWUnLockLatch(_lock); \
|
||||
uDebug("MP WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \
|
||||
uDebug("MP WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
|
|
@ -233,9 +233,6 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
|
|||
|
||||
MP_ERR_RET(mpUpdateCfg(pPool));
|
||||
|
||||
pPool->ctrl.statFlags = MP_STAT_FLAG_LOG_ALL & (~MP_LOG_FLAG_ALL_POS);
|
||||
pPool->ctrl.funcFlags = MP_CTRL_FLAG_PRINT_STAT | MP_CTRL_FLAG_CHECK_STAT;
|
||||
|
||||
pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE;
|
||||
pPool->sessionCache.nodeSize = sizeof(SMPSession);
|
||||
|
||||
|
@ -282,7 +279,9 @@ void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int6
|
|||
}
|
||||
|
||||
int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize);
|
||||
mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize);
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOG_MAXSIZE)) {
|
||||
mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) {
|
||||
|
@ -914,7 +913,7 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt
|
|||
if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
|
||||
mpLogDetailStat(&pSession->stat.statDetail, item, pInput);
|
||||
}
|
||||
if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) {
|
||||
mpLogDetailStat(&pPool->stat.statDetail, item, pInput);
|
||||
}
|
||||
if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
|
||||
|
@ -922,7 +921,7 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt
|
|||
mpLogPosStat(&pSession->stat.posStat, item, pInput, true);
|
||||
taosEnableMemPoolUsage();
|
||||
}
|
||||
if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
|
||||
if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) {
|
||||
taosDisableMemPoolUsage();
|
||||
mpLogPosStat(&pPool->stat.posStat, item, pInput, false);
|
||||
taosEnableMemPoolUsage();
|
||||
|
@ -967,7 +966,7 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
|
|||
}
|
||||
|
||||
if (NULL != poolHandle) {
|
||||
pCtrl = &pPool->ctrl;
|
||||
pCtrl = &gMPMgmt.ctrl;
|
||||
pDetail = &pPool->stat.statDetail;
|
||||
int64_t sessInit = atomic_load_64(&pPool->stat.statSession.initFail) + atomic_load_64(&pPool->stat.statSession.initSucc);
|
||||
if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == atomic_load_64(&pPool->stat.statSession.destroyNum)) {
|
||||
|
@ -1084,6 +1083,9 @@ void mpModInit(void) {
|
|||
|
||||
gMPMgmt.strategy = E_MP_STRATEGY_DIRECT;
|
||||
|
||||
gMPMgmt.ctrl.statFlags = MP_STAT_FLAG_LOG_ALL & (~MP_LOG_FLAG_ALL_POS);
|
||||
gMPMgmt.ctrl.funcFlags = MP_CTRL_FLAG_PRINT_STAT | MP_CTRL_FLAG_CHECK_STAT | MP_CTRL_FLAG_LOG_MAXSIZE;
|
||||
|
||||
//gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0);
|
||||
//if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
|
||||
// uError("failed to init sem2, error: 0x%x", gMPMgmt.code);
|
||||
|
@ -1122,16 +1124,16 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
|
|||
if (NULL != pPool) {
|
||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
|
||||
detailName[sizeof(detailName) - 1] = 0;
|
||||
mpPrintSessionStat(&pPool->ctrl, &pPool->stat.statSession, detailName);
|
||||
mpPrintStatDetail(&pPool->ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
|
||||
mpPrintSessionStat(&gMPMgmt.ctrl, &pPool->stat.statSession, detailName);
|
||||
mpPrintStatDetail(&gMPMgmt.ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
|
||||
|
||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
|
||||
detailName[sizeof(detailName) - 1] = 0;
|
||||
mpPrintNodeStat(&pPool->ctrl, pPool->stat.nodeStat, detailName);
|
||||
mpPrintNodeStat(&gMPMgmt.ctrl, pPool->stat.nodeStat, detailName);
|
||||
|
||||
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos");
|
||||
detailName[sizeof(detailName) - 1] = 0;
|
||||
mpPrintPosStat(&pPool->ctrl, &pPool->stat.posStat, detailName);
|
||||
mpPrintPosStat(&gMPMgmt.ctrl, &pPool->stat.posStat, detailName);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1187,17 +1189,13 @@ void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) {
|
|||
(void)mpUpdateCfg(pPool);
|
||||
}
|
||||
|
||||
void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* remainSessions) {
|
||||
void taosMemPoolDestroySession(void* poolHandle, void* session) {
|
||||
SMemPool* pPool = (SMemPool*)poolHandle;
|
||||
SMPSession* pSession = (SMPSession*)session;
|
||||
if (NULL == poolHandle || NULL == pSession) {
|
||||
uWarn("null pointer of poolHandle %p or session %p", poolHandle, session);
|
||||
return;
|
||||
}
|
||||
|
||||
if (remainSessions) {
|
||||
*remainSessions = atomic_sub_fetch_32(&pSession->pJob->remainSession, 1);
|
||||
}
|
||||
|
||||
//TODO;
|
||||
|
||||
|
@ -1225,7 +1223,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, c
|
|||
MP_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
TAOS_MEMCPY(&pSession->ctrl, &pPool->ctrl, sizeof(pSession->ctrl));
|
||||
TAOS_MEMCPY(&pSession->ctrl, &gMPMgmt.ctrl, sizeof(pSession->ctrl));
|
||||
|
||||
if (gMPFps[gMPMgmt.strategy].initSessionFp) {
|
||||
MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession));
|
||||
|
@ -1234,12 +1232,11 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, c
|
|||
MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true));
|
||||
|
||||
pSession->pJob = (SMPJob*)pJob;
|
||||
(void)atomic_add_fetch_32(&pSession->pJob->remainSession, 1);
|
||||
|
||||
_return:
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosMemPoolDestroySession(poolHandle, pSession, NULL);
|
||||
taosMemPoolDestroySession(poolHandle, pSession);
|
||||
pSession = NULL;
|
||||
(void)atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1);
|
||||
} else {
|
||||
|
|
|
@ -42,7 +42,8 @@
|
|||
|
||||
namespace {
|
||||
|
||||
#define MPT_PRINTF(param, ...) (void)printf("[%" PRId64 ",%" PRId64 "] " param, mptCtx.caseLoop, mptCtx.jobLoop, __VA_ARGS__)
|
||||
#define MPT_PRINTF(param, ...) (void)printf("[%" PRId64 ",%" PRId64 "] " param, mptCaseLoop, mptExecLoop, __VA_ARGS__)
|
||||
#define MPT_EPRINTF(param, ...) (void)printf(param, __VA_ARGS__)
|
||||
|
||||
#define MPT_MAX_MEM_ACT_TIMES 300
|
||||
#define MPT_MAX_SESSION_NUM 100
|
||||
|
@ -54,6 +55,7 @@ namespace {
|
|||
#define MPT_MIN_RESERVE_MEM_SIZE (512 * 1048576UL)
|
||||
#define MPT_MIN_MEM_POOL_SIZE (1048576UL)
|
||||
#define MPT_MAX_RETIRE_JOB_NUM 10000
|
||||
#define MPT_DEFAULT_TASK_RUN_TIMES 10
|
||||
|
||||
enum {
|
||||
MPT_READ = 1,
|
||||
|
@ -63,100 +65,16 @@ enum {
|
|||
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
||||
|
||||
|
||||
static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) {
|
||||
int32_t code = -1;
|
||||
|
||||
if (MPT_READ == (type)) {
|
||||
if (atomic_load_32((_lock)) < 0) {
|
||||
uError("invalid lock value before try read lock");
|
||||
return -1;
|
||||
}
|
||||
uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
|
||||
code = taosRTryLockLatch(_lock);
|
||||
uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
|
||||
if (atomic_load_32((_lock)) <= 0) {
|
||||
uError("invalid lock value after try read lock");
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (atomic_load_32((_lock)) < 0) {
|
||||
uError("invalid lock value before try write lock");
|
||||
return -1;
|
||||
}
|
||||
uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
|
||||
code = taosWTryLockLatch(_lock);
|
||||
uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
|
||||
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) {
|
||||
uError("invalid lock value after try write lock");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
#define MPT_LOCK(type, _lock) \
|
||||
do { \
|
||||
if (MPT_READ == (type)) { \
|
||||
if (atomic_load_32((_lock)) < 0) { \
|
||||
uError("invalid lock value before read lock"); \
|
||||
break; \
|
||||
} \
|
||||
uDebug("MPT RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRLockLatch(_lock); \
|
||||
uDebug("MPT RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (atomic_load_32((_lock)) <= 0) { \
|
||||
uError("invalid lock value after read lock"); \
|
||||
break; \
|
||||
} \
|
||||
} else { \
|
||||
if (atomic_load_32((_lock)) < 0) { \
|
||||
uError("invalid lock value before write lock"); \
|
||||
break; \
|
||||
} \
|
||||
uDebug("MPT WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWLockLatch(_lock); \
|
||||
uDebug("MPT WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
|
||||
uError("invalid lock value after write lock"); \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define MPT_UNLOCK(type, _lock) \
|
||||
do { \
|
||||
if (MPT_READ == (type)) { \
|
||||
if (atomic_load_32((_lock)) <= 0) { \
|
||||
uError("invalid lock value before read unlock"); \
|
||||
break; \
|
||||
} \
|
||||
uDebug("MPT RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosRUnLockLatch(_lock); \
|
||||
uDebug("MPT RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (atomic_load_32((_lock)) < 0) { \
|
||||
uError("invalid lock value after read unlock"); \
|
||||
break; \
|
||||
} \
|
||||
} else { \
|
||||
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
|
||||
uError("invalid lock value before write unlock"); \
|
||||
break; \
|
||||
} \
|
||||
uDebug("MPT WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
taosWUnLockLatch(_lock); \
|
||||
uDebug("MPT WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (atomic_load_32((_lock)) < 0) { \
|
||||
uError("invalid lock value after write unlock"); \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
|
||||
threadlocal void* mptThreadPoolHandle = NULL;
|
||||
threadlocal void* mptThreadPoolSession = NULL;
|
||||
threadlocal int32_t mptJobNum = 0;
|
||||
threadlocal int32_t mptExecNum = 0;
|
||||
threadlocal int32_t mptExecLoop = 0;
|
||||
threadlocal int64_t mptCaseLoop = 0;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#define MPT_SET_TEID(id, tId, eId) \
|
||||
|
@ -209,6 +127,7 @@ typedef struct SMPTJobInfo {
|
|||
SRWLatch lock;
|
||||
int8_t destroyed;
|
||||
SHashObj* pSessions;
|
||||
int8_t initDone;
|
||||
} SMPTJobInfo;
|
||||
|
||||
|
||||
|
@ -219,9 +138,10 @@ typedef struct {
|
|||
int32_t jobNum;
|
||||
int32_t jobTaskNum;
|
||||
int64_t maxSingleAllocSize;
|
||||
char* pSrcString;
|
||||
bool printExecDetail;
|
||||
bool printInputRow;
|
||||
|
||||
bool lockDbg;
|
||||
} SMPTestCtrl;
|
||||
|
||||
typedef struct {
|
||||
|
@ -256,6 +176,7 @@ typedef struct {
|
|||
|
||||
int32_t jobIdx;
|
||||
int64_t jobId;
|
||||
int32_t initTimes;
|
||||
void* pSessions[MPT_MAX_SESSION_NUM];
|
||||
int32_t taskNum;
|
||||
SMPTestTaskCtx taskCtxs[MPT_MAX_SESSION_NUM];
|
||||
|
@ -274,6 +195,12 @@ typedef struct {
|
|||
int32_t randTask;
|
||||
} SMPTestParam;
|
||||
|
||||
typedef struct {
|
||||
int64_t initNum;
|
||||
int64_t retireNum;
|
||||
int64_t destoryNum;
|
||||
} SMPTestJobStat;
|
||||
|
||||
typedef struct {
|
||||
int32_t idx;
|
||||
TdThread threadFp;
|
||||
|
@ -289,17 +216,139 @@ typedef struct SMPTestCtx {
|
|||
SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM];
|
||||
TdThread dropThreadFp;
|
||||
int32_t jobNum;
|
||||
int64_t totalTaskNum;
|
||||
SMPTestJobCtx* jobCtxs;
|
||||
SMPTestParam param;
|
||||
|
||||
SMPTestJobStat runStat;
|
||||
|
||||
SRWLatch stringLock;
|
||||
char* pSrcString;
|
||||
|
||||
bool initDone;
|
||||
int8_t testDone;
|
||||
int64_t caseLoop;
|
||||
int64_t jobLoop;
|
||||
} SMPTestCtx;
|
||||
|
||||
SMPTestCtx mptCtx = {0};
|
||||
SMPTestCtrl mptCtrl = {0};
|
||||
|
||||
static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) {
|
||||
int32_t code = -1;
|
||||
|
||||
if (MPT_READ == (type)) {
|
||||
if (mptCtrl.lockDbg) {
|
||||
if (atomic_load_32((_lock)) < 0) {
|
||||
uError("invalid lock value before try read lock");
|
||||
return -1;
|
||||
}
|
||||
uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
|
||||
}
|
||||
code = taosRTryLockLatch(_lock);
|
||||
if (mptCtrl.lockDbg) {
|
||||
uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
|
||||
if (atomic_load_32((_lock)) <= 0) {
|
||||
uError("invalid lock value after try read lock");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (mptCtrl.lockDbg) {
|
||||
if (atomic_load_32((_lock)) < 0) {
|
||||
uError("invalid lock value before try write lock");
|
||||
return -1;
|
||||
}
|
||||
uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
|
||||
}
|
||||
code = taosWTryLockLatch(_lock);
|
||||
if (mptCtrl.lockDbg) {
|
||||
uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
|
||||
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) {
|
||||
uError("invalid lock value after try write lock");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
#define MPT_LOCK(type, _lock) \
|
||||
do { \
|
||||
if (MPT_READ == (type)) { \
|
||||
if (mptCtrl.lockDbg) { \
|
||||
if (atomic_load_32((_lock)) < 0) { \
|
||||
uError("invalid lock value before read lock"); \
|
||||
break; \
|
||||
} \
|
||||
uDebug("MPT RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
taosRLockLatch(_lock); \
|
||||
if (mptCtrl.lockDbg) { \
|
||||
uDebug("MPT RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (atomic_load_32((_lock)) <= 0) { \
|
||||
uError("invalid lock value after read lock"); \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} else { \
|
||||
if (mptCtrl.lockDbg) { \
|
||||
if (atomic_load_32((_lock)) < 0) { \
|
||||
uError("invalid lock value before write lock"); \
|
||||
break; \
|
||||
} \
|
||||
uDebug("MPT WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
taosWLockLatch(_lock); \
|
||||
if (mptCtrl.lockDbg) { \
|
||||
uDebug("MPT WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
|
||||
uError("invalid lock value after write lock"); \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define MPT_UNLOCK(type, _lock) \
|
||||
do { \
|
||||
if (MPT_READ == (type)) { \
|
||||
if (mptCtrl.lockDbg) { \
|
||||
if (atomic_load_32((_lock)) <= 0) { \
|
||||
uError("invalid lock value before read unlock"); \
|
||||
break; \
|
||||
} \
|
||||
uDebug("MPT RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
taosRUnLockLatch(_lock); \
|
||||
if (mptCtrl.lockDbg) { \
|
||||
uDebug("MPT RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (atomic_load_32((_lock)) < 0) { \
|
||||
uError("invalid lock value after read unlock"); \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} else { \
|
||||
if (mptCtrl.lockDbg) { \
|
||||
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
|
||||
uError("invalid lock value before write unlock"); \
|
||||
break; \
|
||||
} \
|
||||
uDebug("MPT WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
} \
|
||||
taosWUnLockLatch(_lock); \
|
||||
if (mptCtrl.lockDbg) { \
|
||||
uDebug("MPT WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
|
||||
if (atomic_load_32((_lock)) < 0) { \
|
||||
uError("invalid lock value after write unlock"); \
|
||||
break; \
|
||||
} \
|
||||
} \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
|
||||
|
||||
#if 0
|
||||
void joinTestReplaceRetrieveFp() {
|
||||
static Stub stub;
|
||||
|
@ -356,13 +405,23 @@ void mptDeleteJobQueueData(void* pData) {
|
|||
taosHashRelease(mptCtx.pJobs, pJob);
|
||||
}
|
||||
|
||||
|
||||
void mptDestroyJobInfo(void* job) {
|
||||
SMPTJobInfo* pJob = (SMPTJobInfo*)job;
|
||||
|
||||
taosMemFree(pJob->memInfo);
|
||||
taosHashCleanup(pJob->pSessions);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void mptInit() {
|
||||
osDefaultInit();
|
||||
mptInitLogFile();
|
||||
|
||||
mptCtrl.caseLoopTimes = 100000;
|
||||
mptCtrl.taskActTimes = 0;
|
||||
mptCtrl.maxSingleAllocSize = 104857600;
|
||||
mptCtrl.maxSingleAllocSize = 104857600 * 5;
|
||||
mptCtrl.jobNum = 100;
|
||||
mptCtrl.jobExecTimes = 10;
|
||||
mptCtrl.jobTaskNum = 0;
|
||||
|
@ -370,51 +429,41 @@ void mptInit() {
|
|||
mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
ASSERT_TRUE(NULL != mptCtx.pJobs);
|
||||
|
||||
taosHashSetFreeFp(mptCtx.pJobs, mptDestroyJobInfo);
|
||||
|
||||
mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, mptDeleteJobQueueData, NULL);
|
||||
ASSERT_TRUE(NULL != mptCtx.pJobQueue);
|
||||
mptCtx.jobCtxs = (SMPTestJobCtx*)taosMemoryCalloc(MPT_MAX_JOB_NUM, sizeof(*mptCtx.jobCtxs));
|
||||
ASSERT_TRUE(NULL != mptCtx.jobCtxs);
|
||||
|
||||
mptCtrl.pSrcString = (char*)taosMemoryMalloc(mptCtrl.maxSingleAllocSize);
|
||||
ASSERT_TRUE(NULL != mptCtrl.pSrcString);
|
||||
memset(mptCtrl.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1);
|
||||
mptCtrl.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0;
|
||||
mptCtx.pSrcString = (char*)taosMemoryMalloc(mptCtrl.maxSingleAllocSize);
|
||||
ASSERT_TRUE(NULL != mptCtx.pSrcString);
|
||||
memset(mptCtx.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1);
|
||||
mptCtx.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0;
|
||||
}
|
||||
|
||||
|
||||
void mptDestroyJobInfo(SMPTJobInfo* pJob) {
|
||||
taosMemFree(pJob->memInfo);
|
||||
taosHashCleanup(pJob->pSessions);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) {
|
||||
SMPTJobInfo *pJobInfo = pJobCtx->pJob;
|
||||
char id[sizeof(tId) + sizeof(eId) + 1] = {0};
|
||||
MPT_SET_TEID(id, tId, eId);
|
||||
int32_t remainSessions = 0;
|
||||
int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1);
|
||||
|
||||
(void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id));
|
||||
|
||||
taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions);
|
||||
taosMemPoolDestroySession(gMemPoolHandle, session);
|
||||
|
||||
if (0 == remainSessions) {
|
||||
MPT_LOCK(MPT_WRITE, &pJobInfo->lock);
|
||||
if (0 == taosHashGetSize(pJobInfo->pSessions)) {
|
||||
atomic_store_8(&pJobInfo->destroyed, 1);
|
||||
|
||||
uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobInfo->errCode);
|
||||
|
||||
mptDestroyJobInfo(pJobInfo);
|
||||
MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock);
|
||||
|
||||
pJobCtx->pJob = NULL;
|
||||
atomic_add_fetch_64(&mptCtx.runStat.destoryNum, 1);
|
||||
|
||||
(void)taosHashRemove(mptCtx.pJobs, &qId, sizeof(qId));
|
||||
|
||||
pJobCtx->pJob = NULL;
|
||||
uInfo("the whole query job removed");
|
||||
} else {
|
||||
MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -500,7 +549,7 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* p
|
|||
break;
|
||||
}
|
||||
|
||||
pJobCtx->pJob = pJob;
|
||||
atomic_store_ptr(&pJobCtx->pJob, pJob);
|
||||
pJob->pCtx = pJobCtx;
|
||||
|
||||
char id[sizeof(tId) + sizeof(eId) + 1] = {0};
|
||||
|
@ -508,8 +557,12 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* p
|
|||
|
||||
assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id));
|
||||
|
||||
atomic_add_fetch_32(&pJob->memInfo->remainSession, 1);
|
||||
|
||||
assert(0 == taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES));
|
||||
|
||||
atomic_store_8(&pJob->initDone, 1);
|
||||
|
||||
_return:
|
||||
|
||||
if (NULL != pJob) {
|
||||
|
@ -543,12 +596,20 @@ void mptInitJob(int32_t idx) {
|
|||
pJobCtx->jobIdx = idx;
|
||||
pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1);
|
||||
pJobCtx->taskNum = (mptCtrl.jobTaskNum) ? mptCtrl.jobTaskNum : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_SESSION_NUM/10)) : (taosRand() % MPT_MAX_SESSION_NUM)) + 1;
|
||||
pJobCtx->initTimes++;
|
||||
|
||||
if (!mptCtx.initDone) {
|
||||
atomic_add_fetch_64(&mptCtx.totalTaskNum, pJobCtx->taskNum);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
|
||||
mptInitTask(i, 0, pJobCtx);
|
||||
assert(pJobCtx->pJob);
|
||||
}
|
||||
|
||||
uDebug("JOB:0x%x idx:%d initialized, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->taskNum);
|
||||
atomic_add_fetch_64(&mptCtx.runStat.initNum, 1);
|
||||
|
||||
uDebug("JOB:0x%x idx:%d initialized, total times:%d, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->initTimes, pJobCtx->taskNum);
|
||||
}
|
||||
|
||||
|
||||
|
@ -565,18 +626,13 @@ void mptDestroyTask(SMPTestJobCtx* pJobCtx, int32_t taskIdx) {
|
|||
}
|
||||
|
||||
int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
|
||||
if (MPT_TRY_LOCK(MPT_WRITE, &pJobCtx->jobExecLock)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
uint64_t jobId = pJobCtx->jobId;
|
||||
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
|
||||
if (!pJobCtx->taskCtxs[i].destoryed) {
|
||||
mptDestroyTask(pJobCtx, i);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
//mptDestroyJobInfo(pJobCtx->pJob);
|
||||
//(void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId));
|
||||
|
||||
|
@ -586,8 +642,6 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
|
|||
mptInitJob(jobIdx);
|
||||
}
|
||||
|
||||
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
|
||||
|
||||
MPT_PRINTF(" JOB:0x%x retired\n", jobId);
|
||||
|
||||
return 0;
|
||||
|
@ -598,32 +652,50 @@ void mptCheckCompareJobInfo(SMPTestJobCtx* pJobCtx) {
|
|||
}
|
||||
|
||||
int32_t mptResetJob(SMPTestJobCtx* pJobCtx) {
|
||||
if (NULL == pJobCtx->pJob) {
|
||||
if (MPT_TRY_LOCK(MPT_WRITE, &pJobCtx->jobExecLock)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
if (NULL == atomic_load_ptr(&pJobCtx->pJob)) {
|
||||
int32_t jobIdx = pJobCtx->jobIdx;
|
||||
memset((char*)pJobCtx + sizeof(pJobCtx->jobExecLock), 0, sizeof(SMPTestJobCtx) - sizeof(pJobCtx->jobExecLock));
|
||||
mptInitJob(jobIdx);
|
||||
|
||||
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
if (atomic_load_8(&pJobCtx->pJob->retired)) {
|
||||
int32_t taskRunning = atomic_load_32(&pJobCtx->taskRunningNum);
|
||||
if (0 == taskRunning) {
|
||||
return mptDestroyJob(pJobCtx, true);
|
||||
code = mptDestroyJob(pJobCtx, true);
|
||||
} else {
|
||||
uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pJobCtx->jobId, taskRunning);
|
||||
return -1;
|
||||
code = -1;
|
||||
}
|
||||
}
|
||||
|
||||
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void mptRetireJob(SMPTJobInfo* pJob) {
|
||||
SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx;
|
||||
|
||||
if (MPT_TRY_LOCK(MPT_WRITE, &pCtx->jobExecLock)) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t taskRunning = atomic_load_32(&pCtx->taskRunningNum);
|
||||
if (0 == taskRunning) {
|
||||
mptDestroyJob(pCtx, false);
|
||||
} else {
|
||||
uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pCtx->jobId, taskRunning);
|
||||
}
|
||||
|
||||
MPT_UNLOCK(MPT_WRITE, &pCtx->jobExecLock);
|
||||
}
|
||||
|
||||
int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
|
||||
|
@ -662,7 +734,7 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
|
|||
uint64_t jobId = 0;
|
||||
int64_t retiredSize = 0;
|
||||
while (retiredSize < retireSize && NULL != pJob) {
|
||||
if (atomic_load_8(&pJob->retired)) {
|
||||
if (atomic_load_8(&pJob->retired) || 0 == atomic_load_8(&pJob->initDone)) {
|
||||
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
|
||||
continue;
|
||||
}
|
||||
|
@ -671,33 +743,35 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
|
|||
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
|
||||
jobId = pJob->memInfo->jobId;
|
||||
|
||||
atomic_add_fetch_64(&mptCtx.runStat.retireNum, 1);
|
||||
|
||||
mptRetireJob(pJob);
|
||||
|
||||
retiredSize += aSize;
|
||||
|
||||
uDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64,
|
||||
uDebug("QID:0x%" PRIx64 " job retired cause of limit reached, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64,
|
||||
jobId, aSize, retireSize, retiredSize);
|
||||
}
|
||||
|
||||
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL);
|
||||
}
|
||||
|
||||
taosHashCancelIterate(mptCtx.pJobs, pJob);
|
||||
}
|
||||
|
||||
|
||||
void mptRetireJobCb(uint64_t jobId, uint64_t clientId, int32_t errCode) {
|
||||
char id[sizeof(jobId) + sizeof(clientId) + 1] = {0};
|
||||
MPT_SET_QCID(id, jobId, clientId);
|
||||
|
||||
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, id, sizeof(id));
|
||||
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &jobId, sizeof(jobId));
|
||||
if (NULL == pJob) {
|
||||
uError("QID:0x%" PRIx64 " CID:0x%" PRIx64 " fail to get job from job hash", jobId, clientId);
|
||||
uError("QID:0x%" PRIx64 " fail to get job from job hash", jobId);
|
||||
return;
|
||||
}
|
||||
|
||||
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
|
||||
uInfo("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
|
||||
uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
|
||||
atomic_add_fetch_64(&mptCtx.runStat.retireNum, 1);
|
||||
} else {
|
||||
uDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
|
||||
uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -722,7 +796,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
|
|||
|
||||
while (!actDone) {
|
||||
actId = taosRand() % 10;
|
||||
size = (taosRand() % 10) ? (taosRand() % (mptCtrl.maxSingleAllocSize / 100)) : (taosRand() % mptCtrl.maxSingleAllocSize);
|
||||
size = (taosRand() % 8) ? (taosRand() % (mptCtrl.maxSingleAllocSize / 100)) : (taosRand() % mptCtrl.maxSingleAllocSize);
|
||||
|
||||
switch (actId) {
|
||||
case 0: { // malloc
|
||||
|
@ -736,7 +810,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
|
|||
pTask->stat.bytes.memMalloc.exec+=size;
|
||||
pTask->stat.times.memMalloc.fail++;
|
||||
pTask->stat.bytes.memMalloc.fail+=size;
|
||||
uError("JOB:0x%x TASK:0x%x mpMalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
|
||||
uError("JOB:0x%x TASK:0x%x mpMalloc %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -764,7 +838,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
|
|||
pTask->stat.bytes.memCalloc.exec+=size;
|
||||
pTask->stat.times.memCalloc.fail++;
|
||||
pTask->stat.bytes.memCalloc.fail+=size;
|
||||
uError("JOB:0x%x TASK:0x%x mpCalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
|
||||
uError("JOB:0x%x TASK:0x%x mpCalloc %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -794,7 +868,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
|
|||
pTask->stat.bytes.memRealloc.exec+=size;
|
||||
pTask->stat.times.memRealloc.fail++;
|
||||
pTask->stat.bytes.memRealloc.fail+=size;
|
||||
uError("JOB:0x%x TASK:0x%x new mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
|
||||
uError("JOB:0x%x TASK:0x%x new mpRealloc %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -833,7 +907,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
|
|||
pTask->stat.bytes.memFree.exec+=osize;
|
||||
pTask->stat.times.memFree.succ++;
|
||||
pTask->stat.bytes.memFree.succ+=osize;
|
||||
uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
|
||||
uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno));
|
||||
pTask->memIdx--;
|
||||
return;
|
||||
}
|
||||
|
@ -879,16 +953,18 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
|
|||
}
|
||||
|
||||
size /= 10;
|
||||
mptCtrl.pSrcString[size] = 0;
|
||||
pTask->pMemList[pTask->memIdx].p = mptStrdup(mptCtrl.pSrcString);
|
||||
mptCtrl.pSrcString[size] = 'W';
|
||||
MPT_LOCK(MPT_WRITE, &mptCtx.stringLock);
|
||||
mptCtx.pSrcString[size] = 0;
|
||||
pTask->pMemList[pTask->memIdx].p = mptStrdup(mptCtx.pSrcString);
|
||||
mptCtx.pSrcString[size] = 'W';
|
||||
MPT_UNLOCK(MPT_WRITE, &mptCtx.stringLock);
|
||||
|
||||
if (NULL == pTask->pMemList[pTask->memIdx].p) {
|
||||
pTask->stat.times.memStrdup.exec++;
|
||||
pTask->stat.bytes.memStrdup.exec+=size + 1;
|
||||
pTask->stat.times.memStrdup.fail++;
|
||||
pTask->stat.bytes.memStrdup.fail+=size + 1;
|
||||
uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
|
||||
uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -912,15 +988,18 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
|
|||
}
|
||||
|
||||
size /= 10;
|
||||
assert(strlen(mptCtrl.pSrcString) > size);
|
||||
pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtrl.pSrcString, size);
|
||||
|
||||
MPT_LOCK(MPT_WRITE, &mptCtx.stringLock);
|
||||
assert(strlen(mptCtx.pSrcString) > size);
|
||||
pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtx.pSrcString, size);
|
||||
MPT_UNLOCK(MPT_WRITE, &mptCtx.stringLock);
|
||||
|
||||
if (NULL == pTask->pMemList[pTask->memIdx].p) {
|
||||
pTask->stat.times.memStrndup.exec++;
|
||||
pTask->stat.bytes.memStrndup.exec+=size + 1;
|
||||
pTask->stat.times.memStrndup.fail++;
|
||||
pTask->stat.bytes.memStrndup.fail+=size + 1;
|
||||
uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
|
||||
uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -985,7 +1064,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
|
|||
pTask->stat.bytes.memMalloc.exec+=size;
|
||||
pTask->stat.times.memMalloc.fail++;
|
||||
pTask->stat.bytes.memMalloc.fail+=size;
|
||||
uError("JOB:0x%x TASK:0x%x mpMallocAlign %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno);
|
||||
uError("JOB:0x%x TASK:0x%x mpMallocAlign %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1030,7 +1109,7 @@ void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (taosRand() % 10 > 0) {
|
||||
if (taosRand() % 2) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1071,9 +1150,9 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx, int32
|
|||
mptSimulateTask(pJobCtx, pCtx, actTimes);
|
||||
mptDisableMemoryPoolUsage();
|
||||
|
||||
if (!atomic_load_8(&pJobCtx->pJob->retired)) {
|
||||
// if (!atomic_load_8(&pJobCtx->pJob->retired)) {
|
||||
mptSimulateOutTask(pJobCtx, pCtx);
|
||||
}
|
||||
// }
|
||||
|
||||
taosWUnLockLatch(&pCtx->taskExecLock);
|
||||
|
||||
|
@ -1087,6 +1166,7 @@ void mptInitJobs() {
|
|||
int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM;
|
||||
|
||||
memset(mptCtx.jobCtxs, 0, sizeof(*mptCtx.jobCtxs) * jobNum);
|
||||
mptCtx.totalTaskNum = 0;
|
||||
|
||||
for (int32_t i = 0; i < jobNum; ++i) {
|
||||
mptInitJob(i);
|
||||
|
@ -1163,61 +1243,109 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
|
|||
}
|
||||
}
|
||||
|
||||
void* mptRunThreadFunc(void* param) {
|
||||
SMPTestThread* pThread = (SMPTestThread*)param;
|
||||
int32_t jobExecTimes = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1;
|
||||
void mptLaunchSingleTask(SMPTestThread* pThread, SMPTestJobCtx* pJobCtx, int32_t taskIdx, int32_t actTimes) {
|
||||
if (atomic_load_8(&pJobCtx->pJob->retired) || pJobCtx->taskCtxs[taskIdx].destoryed) {
|
||||
return;
|
||||
}
|
||||
|
||||
MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, taskIdx, pJobCtx->taskNum);
|
||||
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes);
|
||||
MPT_PRINTF("Thread %d end %d:%d task\n", pThread->idx, taskIdx, pJobCtx->taskNum);
|
||||
|
||||
}
|
||||
|
||||
for (int32_t n = 0; n < jobExecTimes; ++n) {
|
||||
mptCtx.jobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : (taosRand() % MPT_MAX_JOB_NUM + 1);
|
||||
mptCtx.jobLoop = n;
|
||||
void mptRunRandTasks(SMPTestThread* pThread) {
|
||||
int64_t runTaskTimes = mptCtx.totalTaskNum * MPT_DEFAULT_TASK_RUN_TIMES, taskExecIdx = 0;
|
||||
int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM;
|
||||
int32_t jobIdx = 0, taskIdx = 0, code = 0;
|
||||
SMPTestJobCtx* pJobCtx = NULL;
|
||||
|
||||
MPT_PRINTF("Thread %d start the %d:%d job loops - jobNum:%d\n", pThread->idx, n, jobExecTimes, mptCtx.jobNum);
|
||||
MPT_PRINTF("Thread %d start the %d:%d exection - runTaskTimes:%" PRId64 "\n", pThread->idx, mptExecLoop, mptExecNum, runTaskTimes);
|
||||
|
||||
for (int32_t i = 0; i < mptCtx.jobNum; ++i) {
|
||||
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
|
||||
MPT_PRINTF(" Thread %d start to run %d:%d job[%d:0x%" PRIx64 "]\n", pThread->idx, i, mptCtx.jobNum, pJobCtx->jobIdx, pJobCtx->jobId);
|
||||
while (runTaskTimes > 0) {
|
||||
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES));
|
||||
jobIdx = taosRand() % jobNum;
|
||||
|
||||
if (mptResetJob(pJobCtx)) {
|
||||
continue;
|
||||
}
|
||||
pJobCtx = &mptCtx.jobCtxs[jobIdx];
|
||||
|
||||
if (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (mptCtx.param.randTask) {
|
||||
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES));
|
||||
int32_t taskIdx = taosRand() % pJobCtx->taskNum;
|
||||
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes);
|
||||
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t m = 0; m < pJobCtx->taskNum; ++m) {
|
||||
if (atomic_load_8(&pJobCtx->pJob->retired)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pJobCtx->taskCtxs[m].destoryed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES));
|
||||
|
||||
MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, m, pJobCtx->taskNum);
|
||||
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m, actTimes);
|
||||
}
|
||||
|
||||
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
|
||||
|
||||
MPT_PRINTF(" Thread %d end %dth JOB 0x%x exec, retired:%d\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId, pJobCtx->pJob->retired);
|
||||
|
||||
mptResetJob(pJobCtx);
|
||||
if (mptResetJob(pJobCtx)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
MPT_PRINTF("Thread %d finish the %dth job loops\n", pThread->idx, n);
|
||||
if (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
taskIdx = taosRand() % pJobCtx->taskNum;
|
||||
|
||||
if (atomic_load_8(&pJobCtx->pJob->retired) || pJobCtx->taskCtxs[taskIdx].destoryed) {
|
||||
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, taskExecIdx, runTaskTimes);
|
||||
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes);
|
||||
MPT_PRINTF("Thread %d end %d:%d task\n", pThread->idx, taskExecIdx, runTaskTimes);
|
||||
|
||||
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
|
||||
|
||||
mptCheckPoolUsedSize(mptCtx.jobNum);
|
||||
runTaskTimes--;
|
||||
taskExecIdx++;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void mptRunLoopJobs(SMPTestThread* pThread) {
|
||||
mptJobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : (taosRand() % MPT_MAX_JOB_NUM + 1);
|
||||
|
||||
MPT_PRINTF("Thread %d start the %d:%d exection - jobNum:%d\n", pThread->idx, mptExecLoop, mptExecNum, mptJobNum);
|
||||
|
||||
for (int32_t i = 0; i < mptJobNum; ++i) {
|
||||
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
|
||||
|
||||
if (mptResetJob(pJobCtx)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
MPT_PRINTF(" Thread %d start to run %d:%d job[%d:0x%" PRIx64 "]\n", pThread->idx, i, mptJobNum, pJobCtx->jobIdx, pJobCtx->jobId);
|
||||
|
||||
for (int32_t m = 0; m < pJobCtx->taskNum; ++m) {
|
||||
if (atomic_load_8(&pJobCtx->pJob->retired)) {
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES));
|
||||
mptLaunchSingleTask(pThread, pJobCtx, m, actTimes);
|
||||
}
|
||||
|
||||
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
|
||||
|
||||
MPT_PRINTF(" Thread %d end %dth JOB 0x%x exec, retired:%d\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId, pJobCtx->pJob->retired);
|
||||
}
|
||||
}
|
||||
|
||||
void* mptRunThreadFunc(void* param) {
|
||||
SMPTestThread* pThread = (SMPTestThread*)param;
|
||||
mptExecNum = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1;
|
||||
|
||||
for (int32_t n = 0; n < mptExecNum; ++n) {
|
||||
mptExecLoop = n;
|
||||
|
||||
if (mptCtx.param.randTask) {
|
||||
mptRunRandTasks(pThread);
|
||||
} else {
|
||||
mptRunLoopJobs(pThread);
|
||||
}
|
||||
|
||||
MPT_PRINTF("Thread %d finish the %dth exection\n", pThread->idx, n);
|
||||
|
||||
if (mptCtx.param.threadNum <= 1) {
|
||||
mptCheckPoolUsedSize(mptJobNum);
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -1226,15 +1354,19 @@ void* mptRunThreadFunc(void* param) {
|
|||
void* mptDropThreadFunc(void* param) {
|
||||
int32_t jobIdx = 0, taskIdx = 0, code = 0;
|
||||
uint64_t taskId = 0;
|
||||
int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM;
|
||||
|
||||
while (!atomic_load_8(&mptCtx.testDone)) {
|
||||
taosMsleep(200);
|
||||
taosMsleep(400);
|
||||
|
||||
MPT_EPRINTF("%" PRId64 " - initJobs:%" PRId64 " retireJobs:%" PRId64 " destoryJobs:%" PRId64 " remainJobs:%" PRId64 "\n", taosGetTimestampMs(),
|
||||
mptCtx.runStat.initNum, mptCtx.runStat.retireNum, mptCtx.runStat.destoryNum, mptCtx.runStat.initNum - mptCtx.runStat.destoryNum);
|
||||
|
||||
if (taosMemPoolTryLockPool(gMemPoolHandle, true)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
jobIdx = taosRand() % mptCtx.jobNum;
|
||||
jobIdx = taosRand() % jobNum;
|
||||
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[jobIdx];
|
||||
MPT_LOCK(MPT_WRITE, &pJobCtx->jobExecLock);
|
||||
if (NULL == pJobCtx->pJob || pJobCtx->pJob->destroyed) {
|
||||
|
@ -1253,15 +1385,15 @@ void* mptDropThreadFunc(void* param) {
|
|||
|
||||
taskId = pJobCtx->taskCtxs[taskIdx].taskId;
|
||||
mptDestroyTask(pJobCtx, taskIdx);
|
||||
MPT_PRINTF("Drop Thread destroy task %d:0x%" PRIx64 " in job %d:%" PRIx64 "\n", taskIdx, taskId, jobIdx, pJobCtx->jobId);
|
||||
MPT_EPRINTF("Drop Thread destroy task %d:0x%" PRIx64 " in job %d:%" PRIx64 "\n", taskIdx, taskId, jobIdx, pJobCtx->jobId);
|
||||
|
||||
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
|
||||
} else {
|
||||
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
|
||||
code = mptDestroyJob(pJobCtx, false);
|
||||
if (0 == code) {
|
||||
MPT_PRINTF("Drop Thread destroy job %d:%" PRIx64 "\n", jobIdx, pJobCtx->jobId);
|
||||
MPT_EPRINTF("Drop Thread destroy job %d:%" PRIx64 "\n", jobIdx, pJobCtx->jobId);
|
||||
}
|
||||
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
|
||||
}
|
||||
|
||||
taosMemPoolUnLockPool(gMemPoolHandle, true);
|
||||
|
@ -1298,21 +1430,26 @@ void mptDestroyJobs() {
|
|||
for (int32_t i = 0; i < jobNum; ++i) {
|
||||
mptDestroyJob(&mptCtx.jobCtxs[i], false);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
void mptRunCase(SMPTestParam* param, int32_t times) {
|
||||
MPT_PRINTF("\t case start the %dth running\n", times);
|
||||
|
||||
mptCtx.caseLoop = times;
|
||||
mptCaseLoop = times;
|
||||
memcpy(&mptCtx.param, param, sizeof(SMPTestParam));
|
||||
tsSingleQueryMaxMemorySize = param->jobQuota;
|
||||
|
||||
atomic_store_8(&mptCtx.testDone, 0);
|
||||
mptCtx.initDone = false;
|
||||
|
||||
mptInitPool();
|
||||
|
||||
mptInitJobs();
|
||||
|
||||
mptCtx.initDone = true;
|
||||
|
||||
for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) {
|
||||
mptStartRunThread(i);
|
||||
}
|
||||
|
@ -1381,7 +1518,7 @@ TEST(FuncTest, SysMemoryPerfTest) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
#if 0
|
||||
TEST(FuncTest, SingleThreadTest) {
|
||||
char* caseName = "FuncTest:SingleThreadTest";
|
||||
SMPTestParam param = {0};
|
||||
|
@ -1397,6 +1534,24 @@ TEST(FuncTest, SingleThreadTest) {
|
|||
|
||||
}
|
||||
#endif
|
||||
#if 1
|
||||
TEST(FuncTest, MultiThreadTest) {
|
||||
char* caseName = "FuncTest:MultiThreadTest";
|
||||
SMPTestParam param = {0};
|
||||
param.reserveMode = true;
|
||||
param.threadNum = 6;
|
||||
param.jobQuota = 1024;
|
||||
param.randTask = true;
|
||||
|
||||
mptPrintTestBeginInfo(caseName, ¶m);
|
||||
|
||||
for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) {
|
||||
mptRunCase(¶m, i);
|
||||
}
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
TEST(FuncTest, MultiThreadsTest) {
|
||||
char* caseName = "FuncTest:MultiThreadsTest";
|
||||
|
|
Loading…
Reference in New Issue