diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 64587abb2a..78cdf15b49 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -136,6 +136,7 @@ void taosMemPoolUnLockPool(void* poolHandle, bool readLock); void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd); void taosMemPoolGetUsedSizeEnd(void* poolHandle); int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize); +void taosMemPoolSchedTrim(void); int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b0be3a4d3b..d941493d99 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -682,7 +682,7 @@ void doDestroyRequest(void *p) { SRequestObj *pRequest = (SRequestObj *)p; uint64_t reqId = pRequest->requestId; - tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); + tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest); int64_t nextReqRefId = pRequest->relation.nextRefId; @@ -724,7 +724,7 @@ void doDestroyRequest(void *p) { taosMemoryFreeClear(pRequest->effectiveUser); taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFree(pRequest); - tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); + tscDebug("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); destroyNextReq(nextReqRefId); } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 539d954246..aa0eb3d7b3 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -45,6 +45,8 @@ extern "C" { #define QW_DEFAULT_THREAD_TASK_NUM 3 +#define QW_RETIRE_JOB_BATCH_NUM 5 + enum { QW_CONC_TASK_LEVEL_LOW = 1, QW_CONC_TASK_LEVEL_MIDDLE, @@ -257,10 +259,17 @@ typedef struct SQWRetireCtx { BoundedQueue* pJobQueue; } SQWRetireCtx; +typedef struct SQueryExecStat { + int64_t taskInitNum; + int64_t taskExecDestroyNum; + int64_t taskSinkDestroyNum; +} SQueryExecStat; + typedef struct SQueryMgmt { - SRWLatch taskMgmtLock; - int32_t concTaskLevel; - SHashObj* pJobInfo; + SRWLatch taskMgmtLock; + int32_t concTaskLevel; + SHashObj* pJobInfo; + SQueryExecStat stat; } SQueryMgmt; #define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST) @@ -399,20 +408,20 @@ extern SQueryMgmt gQueryMgmt; #define QW_SCH_DLOG(param, ...) qDebug("QW:%p clientId:%" PRIx64 " " param, mgmt, clientId, __VA_ARGS__) #define QW_TASK_ELOG(param, ...) \ - qError("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) + qError("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) #define QW_TASK_WLOG(param, ...) \ - qWarn("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) + qWarn("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) #define QW_TASK_DLOG(param, ...) \ - qDebug("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) + qDebug("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) #define QW_TASK_DLOGL(param, ...) \ - qDebugL("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) + qDebugL("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) #define QW_TASK_ELOG_E(param) \ - qError("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId) + qError("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId) #define QW_TASK_WLOG_E(param) \ - qWarn("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId) + qWarn("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId) #define QW_TASK_DLOG_E(param) \ - qDebug("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId) + qDebug("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId) #define QW_SCH_TASK_ELOG(param, ...) \ qError("QW:%p SID:%" PRId64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \ @@ -530,7 +539,7 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwInitQueryPool(void); void qwDestroyJobInfo(void* job); -bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx); +bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errCode); bool qwRetireJob(SQWJobInfo* pJob); void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session); int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index a9b4be0645..4e5bd069a1 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -144,6 +144,7 @@ int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, S *task = taosHashGet(sch->tasksHash, id, sizeof(id)); if (NULL == (*task)) { QW_UNLOCK(rwType, &sch->tasksLock); + QW_TASK_ELOG_E("task status not exists"); QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST); } @@ -280,6 +281,8 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) { qDestroyTask(otaskHandle); taosDisableMemPoolUsage(); + atomic_add_fetch_64(&gQueryMgmt.stat.taskExecDestroyNum, 1); + qDebug("task handle destroyed"); } } @@ -291,6 +294,8 @@ void qwFreeSinkHandle(SQWTaskCtx *ctx) { QW_SINK_ENABLE_MEMPOOL(ctx); dsDestroyDataSinker(osinkHandle); QW_SINK_DISABLE_MEMPOOL(); + + atomic_add_fetch_64(&gQueryMgmt.stat.taskSinkDestroyNum, 1); qDebug("sink handle destroyed"); } @@ -739,48 +744,62 @@ void qwDestroyJobInfo(void* job) { pJob->pSessions = NULL; } -bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { +bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errCode) { int32_t code = TSDB_CODE_SUCCESS; - bool taskFreed = false; + bool resFreed = false; QW_LOCK(QW_WRITE, &ctx->lock); - QW_TASK_DLOG_E("start to force stop task"); + QW_TASK_DLOG("start to stop task, forceStop:%d", forceStop); if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG_E("task already dropping"); QW_UNLOCK(QW_WRITE, &ctx->lock); - return taskFreed; + return resFreed; } if (QW_QUERY_RUNNING(ctx)) { - code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); + code = qwKillTaskHandle(ctx, errCode); if (TSDB_CODE_SUCCESS != code) { QW_TASK_ELOG("task running, async kill failed, error: %x", code); } else { QW_TASK_DLOG_E("task running, async killed"); } } else if (QW_FETCH_RUNNING(ctx)) { - QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); + QW_UPDATE_RSP_CODE(ctx, errCode); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_TASK_DLOG_E("task fetching, update drop received"); +// } else if (forceStop) { } else { + QW_UPDATE_RSP_CODE(ctx, errCode); code = qwDropTask(QW_FPARAMS()); if (TSDB_CODE_SUCCESS != code) { QW_TASK_ELOG("task drop failed, error: %x", code); } else { QW_TASK_DLOG_E("task dropped"); - taskFreed = true; + resFreed = true; } +/* + } else { + QW_UPDATE_RSP_CODE(ctx, errCode); + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); + + qwFreeTaskHandle(ctx); + qwFreeSinkHandle(ctx); + + resFreed = true; + + QW_TASK_DLOG_E("task resources freed"); +*/ } QW_UNLOCK(QW_WRITE, &ctx->lock); - return taskFreed; + return resFreed; } -bool qwRetireTask(QW_FPARAMS_DEF) { +bool qwRetireTask(QW_FPARAMS_DEF, int32_t errCode) { SQWTaskCtx *ctx = NULL; int32_t code = qwAcquireTaskCtx(QW_FPARAMS(), &ctx); @@ -788,7 +807,7 @@ bool qwRetireTask(QW_FPARAMS_DEF) { return false; } - bool retired = qwStopTask(QW_FPARAMS(), ctx); + bool retired = qwStopTask(QW_FPARAMS(), ctx, false, errCode); qwReleaseTaskCtx(mgmt, ctx); @@ -805,7 +824,7 @@ bool qwRetireJob(SQWJobInfo* pJob) { while (pIter) { SQWSessionInfo* pSession = (SQWSessionInfo*)pIter; - if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId)) { + if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId, pJob->errCode)) { retired = false; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 6d91eae4d3..dccfeaee8d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -35,7 +35,7 @@ void qwStopAllTasks(SQWorker *mgmt) { sId = ctx->sId; - (void)qwStopTask(QW_FPARAMS(), ctx); + (void)qwStopTask(QW_FPARAMS(), ctx, true, TSDB_CODE_VND_STOPPED); pIter = taosHashIterate(mgmt->ctxHash, pIter); } @@ -561,6 +561,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_SET_PHASE(ctx, phase); + if (ctx->pJobInfo && (atomic_load_8(&ctx->pJobInfo->retired) || atomic_load_32(&ctx->pJobInfo->errCode))) { + QW_TASK_ELOG("job already failed, error:%s", tstrerror(ctx->pJobInfo->errCode)); + QW_ERR_JRET(ctx->pJobInfo->errCode); + } + if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) { QW_TASK_ELOG_E("query already end"); QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR); @@ -763,6 +768,8 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } + QW_TASK_DLOG("task preprocess %s, code:%s", code ? "failed": "succeed", tstrerror(code)); + return code; } @@ -789,7 +796,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { taosDisableMemPoolUsage(); if (TSDB_CODE_SUCCESS != code) { - code = TSDB_CODE_INVALID_MSG; QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); } @@ -810,6 +816,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_APP_ERROR); } + atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1); + uint64_t flags = 0; dsGetSinkFlags(sinkHandle, &flags); @@ -1307,15 +1315,14 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { SQWTaskCtx ctx = {0}; code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan); + if (TSDB_CODE_SUCCESS != code) { code = TSDB_CODE_INVALID_MSG; QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); } - tsEnableRandErr = true; code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH); - tsEnableRandErr = false; if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -1644,32 +1651,43 @@ void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) { } void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { + qDebug("need to retire jobs in batch, targetRetireSize:%" PRId64 ", remainJobNum:%d, task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64, + retireSize, taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum), + atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum)); + SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); int32_t jobNum = 0; + int32_t alreadyJobNum = 0; int64_t retiredSize = 0; - while (retiredSize < retireSize && NULL != pJob) { + while (retiredSize < retireSize && NULL != pJob && jobNum < QW_RETIRE_JOB_BATCH_NUM) { if (atomic_load_8(&pJob->retired)) { pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); + alreadyJobNum++; continue; } if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); bool retired = qwRetireJob(pJob); - if (retired) { - retiredSize += aSize; - } + + retiredSize += aSize; jobNum++; qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job mark retired in batch, retired:%d, usedSize:%" PRId64 ", retireSize:%" PRId64, pJob->memInfo->jobId, pJob->memInfo->clientId, retired, aSize, retireSize); + } else { + qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job may already failed, errCode:%s", pJob->memInfo->jobId, pJob->memInfo->clientId, tstrerror(pJob->errCode)); } pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); } - qDebug("total %d jobs mark retired, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize); + qDebug("job retire in batch done, [prev:%d, curr:%d, total:%d] jobs, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64 + ", task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64, + alreadyJobNum, jobNum, taosHashGetSize(gQueryMgmt.pJobInfo), retiredSize, retireSize, + atomic_load_64(&gQueryMgmt.stat.taskInitNum), + atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum)); } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index a2fa765077..6f8826cedb 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -34,6 +34,7 @@ extern "C" { #define MP_DEFAULT_MEM_CHK_INTERVAL_MS 10 #define MP_MIN_MEM_CHK_INTERVAL_MS 1 +#define MP_MEMORY_TRIM_INTERVAL_TIMES 500 #define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95) @@ -316,6 +317,7 @@ typedef struct SMemPoolMgmt { int8_t modExit; int64_t waitMs; int32_t code; + int8_t needTrim; } SMemPoolMgmt; extern SMemPoolMgmt gMPMgmt; @@ -488,6 +490,7 @@ enum { if ((cAllocSize / 1048576L) > *(_pool)->cfg.jobQuota) { \ uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %dMB", (_job)->job.jobId, cAllocSize, *(_pool)->cfg.jobQuota); \ (_pool)->cfg.cb.reachFp(pJob->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); \ + atomic_store_8(&gMPMgmt.needTrim, 1); \ terrno = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; \ return NULL; \ } \ @@ -496,6 +499,7 @@ enum { uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", \ (_pool)->name, atomic_load_64(&tsCurrentAvailMemorySize), (_size), (_pool)->cfg.reserveSize); \ (_pool)->cfg.cb.reachFp((_job)->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); \ + atomic_store_8(&gMPMgmt.needTrim, 1); \ terrno = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; \ return NULL; \ } \ diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index b54cbe33ab..19585435c5 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -69,13 +69,17 @@ int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup if (NULL == pInfo->pGrpHead) { pInfo->pGrpHead = taosMemoryCalloc(1, sizeof(*pInfo->pGrpHead)); if (NULL == pInfo->pGrpHead) { - uError("malloc chunkCache failed"); - MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + uError("malloc pGrpHead failed, error:%s", tstrerror(terrno)); + MP_ERR_RET(terrno); } pGrp = pInfo->pGrpHead; } else { pGrp = (SMPCacheGroup*)taosMemoryCalloc(1, sizeof(SMPCacheGroup)); + if (NULL == pInfo->pGrpHead) { + uError("malloc SMPCacheGroup failed, error:%s", tstrerror(terrno)); + MP_ERR_RET(terrno); + } pGrp->pNext = pHead; } @@ -319,6 +323,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) { code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota); pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code); + atomic_store_8(&gMPMgmt.needTrim, 1); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); MP_RET(code); } @@ -328,6 +333,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) { uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize); pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code); + atomic_store_8(&gMPMgmt.needTrim, 1); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); MP_RET(code); } @@ -1026,9 +1032,20 @@ void mpUpdateSystemAvailableMemorySize() { uDebug("system available memory size: %" PRId64, sysAvailSize); } +void mpLaunchTrim(int64_t* loopTimes) { + static int64_t trimTimes = 0; + + taosMemTrim(0, NULL); + + atomic_store_8(&gMPMgmt.needTrim, 0); + *loopTimes = 0; + + uDebug("%" PRId64 "th memory trim launched", ++trimTimes); +} + void* mpMgmtThreadFunc(void* param) { int32_t timeout = 0; - int64_t retireSize = 0; + int64_t retireSize = 0, loopTimes = 0; SMemPool* pPool = (SMemPool*)atomic_load_ptr(&gMemPoolHandle); while (0 == atomic_load_8(&gMPMgmt.modExit)) { @@ -1037,6 +1054,12 @@ void* mpMgmtThreadFunc(void* param) { retireSize = pPool->cfg.reserveSize - tsCurrentAvailMemorySize; if (retireSize > 0) { (*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); + + mpLaunchTrim(&loopTimes); + } + + if ((0 == (++loopTimes) % 500) || atomic_load_8(&gMPMgmt.needTrim)) { + mpLaunchTrim(&loopTimes); } taosMsleep(MP_DEFAULT_MEM_CHK_INTERVAL_MS); @@ -1229,6 +1252,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, c pSession->sessionId = taosStrdup(sessionId); if (NULL == pSession->sessionId) { + uError("strdup sessionId failed, error:%s", tstrerror(terrno)); MP_ERR_JRET(terrno); } @@ -1670,6 +1694,10 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t return code; } +void taosMemPoolSchedTrim(void) { + atomic_store_8(&gMPMgmt.needTrim, 1); +} + int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) { int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index add8d4a17e..ec33a61d40 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -56,7 +56,7 @@ namespace { #define MPT_MIN_MEM_POOL_SIZE (1048576UL) #define MPT_MAX_RETIRE_JOB_NUM 10000 #define MPT_DEFAULT_TASK_RUN_TIMES 10 -#define MPT_NON_POOL_ALLOC_UNIT (256 * 1048576UL) +#define MPT_NON_POOL_ALLOC_UNIT (1048576UL) #define MPT_NON_POOL_KEEP_ALLOC_UNIT (10485760UL * 8) #define MPT_MAX_NON_POOL_ALLOC_TIMES 30000 @@ -418,6 +418,15 @@ void mptDestroyJobInfo(void* job) { } +void mptWriteMem(void* pStart, int64_t size) { + char* pEnd = (char*)pStart + size - 1; + char* p = (char*)pStart; + while (p <= pEnd) { + *p = 'a' + taosRand() % 26; + p += 4096; + } +} + void mptInit() { osDefaultInit(); @@ -446,6 +455,9 @@ void mptInit() { mptCtx.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0; void* p = taosMemMalloc(1048576UL * 20000); + + mptWriteMem(p, (1048576UL * 20000)); + } void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) { @@ -796,14 +808,6 @@ void mptInitPool(void) { assert(0 == taosMemoryPoolInit(mptRetireJobsCb, mptRetireJobCb)); } -void mptWriteMem(void* pStart, int64_t size) { - char* pEnd = (char*)pStart + size - 1; - char* p = (char*)pStart; - while (p <= pEnd) { - *p = 'a' + taosRand() % 26; - p += 4096; - } -} void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { int32_t actId = 0; @@ -1132,6 +1136,8 @@ void mptSimulateOutTask(int64_t targetSize) { } mptWriteMem(pCtx->p, pCtx->size); + + mptCtx.npIdx++; } @@ -1360,31 +1366,25 @@ void* mptRunThreadFunc(void* param) { } void* mptNonPoolThreadFunc(void* param) { - int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT * 3; + int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT; int64_t allocSize = 0; - int32_t loopTimes = 0; while (!atomic_load_8(&mptCtx.testDone)) { mptSimulateOutTask(targetSize); + allocSize += targetSize; - MPT_PRINTF("%d:Non-pool malloc and write %" PRId64 " bytes, keep size:%" PRId64 "\n", loopTimes, targetSize, allocSize); + MPT_EPRINTF("%d:Non-pool malloc and write %" PRId64 " bytes, keep size:%" PRId64 "\n", mptCtx.npIdx - 1, targetSize, allocSize); + taosUsleep(1); - taosMsleep(100); - taosMemFreeClear(mptCtx.npMemList[mptCtx.npIdx].p); + if ((mptCtx.npIdx * targetSize) >= (tsMinReservedMemorySize * 1048576UL * 10)) { + for (int32_t i = 0; i < mptCtx.npIdx; ++i) { + taosMemFreeClear(mptCtx.npMemList[i].p); + } - loopTimes++; - - if (0 == (loopTimes % 100)) { - mptSimulateOutTask(MPT_NON_POOL_KEEP_ALLOC_UNIT); - allocSize += MPT_NON_POOL_KEEP_ALLOC_UNIT; - mptCtx.npIdx++; - } - - taosMsleep(100); - - if (loopTimes >= 4000) { + mptCtx.npIdx = 0; targetSize += MPT_NON_POOL_ALLOC_UNIT; - loopTimes = 0; + allocSize = 0; + taosMsleep(100); } } diff --git a/tests/script/api/asyncQuery.c b/tests/script/api/asyncQuery.c new file mode 100644 index 0000000000..23e4aced1d --- /dev/null +++ b/tests/script/api/asyncQuery.c @@ -0,0 +1,113 @@ +// sample code to verify multiple queries with the same reqid +// to compile: gcc -o sameReqdiTest sameReqidTest.c -ltaos + +#include +#include +#include +#include +#include "taos.h" + +#define CONST_REQ_ID 12345 +#define TEST_DB "db1" + +#define CHECK_CONDITION(condition, prompt, errstr) \ + do { \ + if (!(condition)) { \ + printf("\033[31m[%s:%d] failed to " prompt ", reason: %s\033[0m\n", __func__, __LINE__, errstr); \ + exit(EXIT_FAILURE); \ + } \ + } while (0) + +#define CHECK_RES(res, prompt) CHECK_CONDITION(taos_errno(res) == 0, prompt, taos_errstr(res)) +#define CHECK_CODE(code, prompt) CHECK_CONDITION(code == 0, prompt, taos_errstr(NULL)) + +int64_t errTimes = 0, finQueries = 0; + +static TAOS* getNewConnection() { + const char* host = "127.0.0.1"; + const char* user = "root"; + const char* passwd = "taosdata"; + TAOS* taos = NULL; + + taos_options(TSDB_OPTION_TIMEZONE, "GMT-8"); + taos = taos_connect(host, user, passwd, "", 0); + CHECK_CONDITION(taos != NULL, "connect to db", taos_errstr(NULL)); + return taos; +} + +static int32_t printResult(TAOS_RES* res, int32_t nlimit) { + TAOS_ROW row = NULL; + TAOS_FIELD* fields = NULL; + int32_t numFields = 0; + int32_t nRows = 0; + + numFields = taos_num_fields(res); + fields = taos_fetch_fields(res); + while ((nlimit-- > 0) && (row = taos_fetch_row(res))) { + nRows++; + } + return nRows; +} + +void retrieveCallback(void* param, TAOS_RES* res, int32_t nrows) { + if (nrows < 0) { + taos_free_result(res); + atomic_add_fetch_64(&finQueries, 1); + atomic_add_fetch_64(&errTimes, 1); + } else if (nrows == 0) { + taos_free_result(res); + atomic_add_fetch_64(&finQueries, 1); + } else { + printResult(res, nrows); + taos_fetch_rows_a(res, retrieveCallback, param); + } +} + +void selectCallback(void* param, TAOS_RES* res, int32_t code) { + if (code) { + atomic_add_fetch_64(&errTimes, 1); + taos_free_result(res); + atomic_add_fetch_64(&finQueries, 1); + return; + } + taos_fetch_rows_a(res, retrieveCallback, param); +} + +static void verifyQueryAsync(TAOS* taos) { + taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL); + taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL); + taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL); + taos_query_a(taos, "select * from stb", selectCallback, NULL); + taos_query_a(taos, "select * from stb", selectCallback, NULL); + taos_query_a(taos, "select * from stb", selectCallback, NULL); + taos_query_a(taos, "select * from stb", selectCallback, NULL); + taos_query_a(taos, "select * from stb", selectCallback, NULL); + taos_query_a(taos, "select * from stb", selectCallback, NULL); + taos_query_a(taos, "select * from stb", selectCallback, NULL); +} + +int main(int argc, char* argv[]) { + TAOS* taos = NULL; + int32_t code = 0; + + taos = getNewConnection(); + taos_select_db(taos, TEST_DB); + CHECK_CODE(code, "switch to database"); + + printf("************ prepare data *************\n"); + + for (int64_t i = 0; i < 1000000000; ++i) { + verifyQueryAsync(taos); + printf("%llu queries launched, errTimes:%lld \n", i * 10, errTimes); + while ((i * 10 - atomic_load_64(&finQueries)) > 1000) { + printf("left queries:%llu\n", (i * 10 - atomic_load_64(&finQueries))); + taosMsleep(2000); + } + printf("\n"); + } + + taos_close(taos); + taos_cleanup(); + + return 0; +} diff --git a/tests/script/api/makefile b/tests/script/api/makefile index ce5980b37a..b871c5f3ff 100644 --- a/tests/script/api/makefile +++ b/tests/script/api/makefile @@ -22,6 +22,7 @@ exe: gcc $(CFLAGS) ./insert_stb.c -o $(ROOT)insert_stb $(LFLAGS) gcc $(CFLAGS) ./tmqViewTest.c -o $(ROOT)tmqViewTest $(LFLAGS) gcc $(CFLAGS) ./stmtQuery.c -o $(ROOT)stmtQuery $(LFLAGS) + gcc $(CFLAGS) ./asyncQuery.c -o $(ROOT)asyncQuery $(LFLAGS) # gcc $(CFLAGS) ./stmt.c -o $(ROOT)stmt $(LFLAGS) # gcc $(CFLAGS) ./stmt2.c -o $(ROOT)stmt2 $(LFLAGS) # gcc $(CFLAGS) ./stmt2-example.c -o $(ROOT)stmt2-example $(LFLAGS)