fix: job retire issues

This commit is contained in:
dapan1121 2024-12-02 14:51:31 +08:00
parent 90b60d9d01
commit ce07aaf774
10 changed files with 255 additions and 62 deletions

View File

@ -136,6 +136,7 @@ void taosMemPoolUnLockPool(void* poolHandle, bool readLock);
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd);
void taosMemPoolGetUsedSizeEnd(void* poolHandle);
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize);
void taosMemPoolSchedTrim(void);
int32_t taosMemoryPoolInit(mpReserveFailFp, mpReserveReachFp);

View File

@ -682,7 +682,7 @@ void doDestroyRequest(void *p) {
SRequestObj *pRequest = (SRequestObj *)p;
uint64_t reqId = pRequest->requestId;
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
tscDebug("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
int64_t nextReqRefId = pRequest->relation.nextRefId;
@ -724,7 +724,7 @@ void doDestroyRequest(void *p) {
taosMemoryFreeClear(pRequest->effectiveUser);
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
tscDebug("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
destroyNextReq(nextReqRefId);
}

View File

@ -45,6 +45,8 @@ extern "C" {
#define QW_DEFAULT_THREAD_TASK_NUM 3
#define QW_RETIRE_JOB_BATCH_NUM 5
enum {
QW_CONC_TASK_LEVEL_LOW = 1,
QW_CONC_TASK_LEVEL_MIDDLE,
@ -257,10 +259,17 @@ typedef struct SQWRetireCtx {
BoundedQueue* pJobQueue;
} SQWRetireCtx;
typedef struct SQueryExecStat {
int64_t taskInitNum;
int64_t taskExecDestroyNum;
int64_t taskSinkDestroyNum;
} SQueryExecStat;
typedef struct SQueryMgmt {
SRWLatch taskMgmtLock;
int32_t concTaskLevel;
SHashObj* pJobInfo;
SRWLatch taskMgmtLock;
int32_t concTaskLevel;
SHashObj* pJobInfo;
SQueryExecStat stat;
} SQueryMgmt;
#define QW_CTX_NOT_EXISTS_ERR_CODE(mgmt) (atomic_load_8(&(mgmt)->nodeStopped) ? TSDB_CODE_VND_STOPPED : TSDB_CODE_QRY_TASK_CTX_NOT_EXIST)
@ -399,20 +408,20 @@ extern SQueryMgmt gQueryMgmt;
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p clientId:%" PRIx64 " " param, mgmt, clientId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) \
qError("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
qError("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) \
qWarn("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
qWarn("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) \
qDebug("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
qDebug("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) \
qDebugL("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
qDebugL("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) \
qError("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
qError("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
#define QW_TASK_WLOG_E(param) \
qWarn("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
qWarn("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
#define QW_TASK_DLOG_E(param) \
qDebug("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
qDebug("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
#define QW_SCH_TASK_ELOG(param, ...) \
qError("QW:%p SID:%" PRId64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
@ -530,7 +539,7 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
int32_t qwInitQueryPool(void);
void qwDestroyJobInfo(void* job);
bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errCode);
bool qwRetireJob(SQWJobInfo* pJob);
void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session);
int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession);

View File

@ -144,6 +144,7 @@ int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, S
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock);
QW_TASK_ELOG_E("task status not exists");
QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
}
@ -280,6 +281,8 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) {
qDestroyTask(otaskHandle);
taosDisableMemPoolUsage();
atomic_add_fetch_64(&gQueryMgmt.stat.taskExecDestroyNum, 1);
qDebug("task handle destroyed");
}
}
@ -291,6 +294,8 @@ void qwFreeSinkHandle(SQWTaskCtx *ctx) {
QW_SINK_ENABLE_MEMPOOL(ctx);
dsDestroyDataSinker(osinkHandle);
QW_SINK_DISABLE_MEMPOOL();
atomic_add_fetch_64(&gQueryMgmt.stat.taskSinkDestroyNum, 1);
qDebug("sink handle destroyed");
}
@ -739,48 +744,62 @@ void qwDestroyJobInfo(void* job) {
pJob->pSessions = NULL;
}
bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errCode) {
int32_t code = TSDB_CODE_SUCCESS;
bool taskFreed = false;
bool resFreed = false;
QW_LOCK(QW_WRITE, &ctx->lock);
QW_TASK_DLOG_E("start to force stop task");
QW_TASK_DLOG("start to stop task, forceStop:%d", forceStop);
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
return taskFreed;
return resFreed;
}
if (QW_QUERY_RUNNING(ctx)) {
code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
code = qwKillTaskHandle(ctx, errCode);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task running, async kill failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task running, async killed");
}
} else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_UPDATE_RSP_CODE(ctx, errCode);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
QW_TASK_DLOG_E("task fetching, update drop received");
// } else if (forceStop) {
} else {
QW_UPDATE_RSP_CODE(ctx, errCode);
code = qwDropTask(QW_FPARAMS());
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task drop failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task dropped");
taskFreed = true;
resFreed = true;
}
/*
} else {
QW_UPDATE_RSP_CODE(ctx, errCode);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
qwFreeTaskHandle(ctx);
qwFreeSinkHandle(ctx);
resFreed = true;
QW_TASK_DLOG_E("task resources freed");
*/
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
return taskFreed;
return resFreed;
}
bool qwRetireTask(QW_FPARAMS_DEF) {
bool qwRetireTask(QW_FPARAMS_DEF, int32_t errCode) {
SQWTaskCtx *ctx = NULL;
int32_t code = qwAcquireTaskCtx(QW_FPARAMS(), &ctx);
@ -788,7 +807,7 @@ bool qwRetireTask(QW_FPARAMS_DEF) {
return false;
}
bool retired = qwStopTask(QW_FPARAMS(), ctx);
bool retired = qwStopTask(QW_FPARAMS(), ctx, false, errCode);
qwReleaseTaskCtx(mgmt, ctx);
@ -805,7 +824,7 @@ bool qwRetireJob(SQWJobInfo* pJob) {
while (pIter) {
SQWSessionInfo* pSession = (SQWSessionInfo*)pIter;
if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId)) {
if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId, pJob->errCode)) {
retired = false;
}

View File

@ -35,7 +35,7 @@ void qwStopAllTasks(SQWorker *mgmt) {
sId = ctx->sId;
(void)qwStopTask(QW_FPARAMS(), ctx);
(void)qwStopTask(QW_FPARAMS(), ctx, true, TSDB_CODE_VND_STOPPED);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
@ -561,6 +561,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_SET_PHASE(ctx, phase);
if (ctx->pJobInfo && (atomic_load_8(&ctx->pJobInfo->retired) || atomic_load_32(&ctx->pJobInfo->errCode))) {
QW_TASK_ELOG("job already failed, error:%s", tstrerror(ctx->pJobInfo->errCode));
QW_ERR_JRET(ctx->pJobInfo->errCode);
}
if (atomic_load_8((int8_t *)&ctx->queryEnd) && !ctx->dynamicTask) {
QW_TASK_ELOG_E("query already end");
QW_ERR_JRET(TSDB_CODE_QW_MSG_ERROR);
@ -763,6 +768,8 @@ _return:
qwReleaseTaskCtx(mgmt, ctx);
}
QW_TASK_DLOG("task preprocess %s, code:%s", code ? "failed": "succeed", tstrerror(code));
return code;
}
@ -789,7 +796,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
taosDisableMemPoolUsage();
if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}
@ -810,6 +816,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
}
atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1);
uint64_t flags = 0;
dsGetSinkFlags(sinkHandle, &flags);
@ -1307,15 +1315,14 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
SQWTaskCtx ctx = {0};
code = qMsgToSubplan(qwMsg->msg, qwMsg->msgLen, &plan);
if (TSDB_CODE_SUCCESS != code) {
code = TSDB_CODE_INVALID_MSG;
QW_TASK_ELOG("task physical plan to subplan failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
}
tsEnableRandErr = true;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, 0, NULL, OPTR_EXEC_MODEL_BATCH);
tsEnableRandErr = false;
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
@ -1644,32 +1651,43 @@ void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) {
}
void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
qDebug("need to retire jobs in batch, targetRetireSize:%" PRId64 ", remainJobNum:%d, task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64,
retireSize, taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum),
atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum));
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
int32_t jobNum = 0;
int32_t alreadyJobNum = 0;
int64_t retiredSize = 0;
while (retiredSize < retireSize && NULL != pJob) {
while (retiredSize < retireSize && NULL != pJob && jobNum < QW_RETIRE_JOB_BATCH_NUM) {
if (atomic_load_8(&pJob->retired)) {
pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
alreadyJobNum++;
continue;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
bool retired = qwRetireJob(pJob);
if (retired) {
retiredSize += aSize;
}
retiredSize += aSize;
jobNum++;
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job mark retired in batch, retired:%d, usedSize:%" PRId64 ", retireSize:%" PRId64,
pJob->memInfo->jobId, pJob->memInfo->clientId, retired, aSize, retireSize);
} else {
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job may already failed, errCode:%s", pJob->memInfo->jobId, pJob->memInfo->clientId, tstrerror(pJob->errCode));
}
pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
}
qDebug("total %d jobs mark retired, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize);
qDebug("job retire in batch done, [prev:%d, curr:%d, total:%d] jobs, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64
", task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64,
alreadyJobNum, jobNum, taosHashGetSize(gQueryMgmt.pJobInfo), retiredSize, retireSize,
atomic_load_64(&gQueryMgmt.stat.taskInitNum),
atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum));
}

View File

@ -34,6 +34,7 @@ extern "C" {
#define MP_DEFAULT_MEM_CHK_INTERVAL_MS 10
#define MP_MIN_MEM_CHK_INTERVAL_MS 1
#define MP_MEMORY_TRIM_INTERVAL_TIMES 500
#define MP_RETIRE_HIGH_THRESHOLD_PERCENT (0.95)
@ -316,6 +317,7 @@ typedef struct SMemPoolMgmt {
int8_t modExit;
int64_t waitMs;
int32_t code;
int8_t needTrim;
} SMemPoolMgmt;
extern SMemPoolMgmt gMPMgmt;
@ -488,6 +490,7 @@ enum {
if ((cAllocSize / 1048576L) > *(_pool)->cfg.jobQuota) { \
uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %dMB", (_job)->job.jobId, cAllocSize, *(_pool)->cfg.jobQuota); \
(_pool)->cfg.cb.reachFp(pJob->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_REACH_QMEM_THRESHOLD); \
atomic_store_8(&gMPMgmt.needTrim, 1); \
terrno = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; \
return NULL; \
} \
@ -496,6 +499,7 @@ enum {
uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes", \
(_pool)->name, atomic_load_64(&tsCurrentAvailMemorySize), (_size), (_pool)->cfg.reserveSize); \
(_pool)->cfg.cb.reachFp((_job)->job.jobId, (_job)->job.clientId, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED); \
atomic_store_8(&gMPMgmt.needTrim, 1); \
terrno = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; \
return NULL; \
} \

View File

@ -69,13 +69,17 @@ int32_t mpAddCacheGroup(SMemPool* pPool, SMPCacheGroupInfo* pInfo, SMPCacheGroup
if (NULL == pInfo->pGrpHead) {
pInfo->pGrpHead = taosMemoryCalloc(1, sizeof(*pInfo->pGrpHead));
if (NULL == pInfo->pGrpHead) {
uError("malloc chunkCache failed");
MP_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
uError("malloc pGrpHead failed, error:%s", tstrerror(terrno));
MP_ERR_RET(terrno);
}
pGrp = pInfo->pGrpHead;
} else {
pGrp = (SMPCacheGroup*)taosMemoryCalloc(1, sizeof(SMPCacheGroup));
if (NULL == pInfo->pGrpHead) {
uError("malloc SMPCacheGroup failed, error:%s", tstrerror(terrno));
MP_ERR_RET(terrno);
}
pGrp->pNext = pHead;
}
@ -319,6 +323,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) {
code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD;
uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota);
pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
atomic_store_8(&gMPMgmt.needTrim, 1);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code);
}
@ -328,6 +333,7 @@ int32_t mpChkFullQuota(SMemPool* pPool, SMPSession* pSession, int64_t size) {
uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %" PRId64 " bytes",
pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, pPool->cfg.reserveSize);
pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
atomic_store_8(&gMPMgmt.needTrim, 1);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code);
}
@ -1026,9 +1032,20 @@ void mpUpdateSystemAvailableMemorySize() {
uDebug("system available memory size: %" PRId64, sysAvailSize);
}
void mpLaunchTrim(int64_t* loopTimes) {
static int64_t trimTimes = 0;
taosMemTrim(0, NULL);
atomic_store_8(&gMPMgmt.needTrim, 0);
*loopTimes = 0;
uDebug("%" PRId64 "th memory trim launched", ++trimTimes);
}
void* mpMgmtThreadFunc(void* param) {
int32_t timeout = 0;
int64_t retireSize = 0;
int64_t retireSize = 0, loopTimes = 0;
SMemPool* pPool = (SMemPool*)atomic_load_ptr(&gMemPoolHandle);
while (0 == atomic_load_8(&gMPMgmt.modExit)) {
@ -1037,6 +1054,12 @@ void* mpMgmtThreadFunc(void* param) {
retireSize = pPool->cfg.reserveSize - tsCurrentAvailMemorySize;
if (retireSize > 0) {
(*pPool->cfg.cb.failFp)(retireSize, TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED);
mpLaunchTrim(&loopTimes);
}
if ((0 == (++loopTimes) % 500) || atomic_load_8(&gMPMgmt.needTrim)) {
mpLaunchTrim(&loopTimes);
}
taosMsleep(MP_DEFAULT_MEM_CHK_INTERVAL_MS);
@ -1229,6 +1252,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, c
pSession->sessionId = taosStrdup(sessionId);
if (NULL == pSession->sessionId) {
uError("strdup sessionId failed, error:%s", tstrerror(terrno));
MP_ERR_JRET(terrno);
}
@ -1670,6 +1694,10 @@ int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t
return code;
}
void taosMemPoolSchedTrim(void) {
atomic_store_8(&gMPMgmt.needTrim, 1);
}
int32_t taosMemoryPoolInit(mpReserveFailFp failFp, mpReserveReachFp reachFp) {
int32_t code = TSDB_CODE_SUCCESS;

View File

@ -56,7 +56,7 @@ namespace {
#define MPT_MIN_MEM_POOL_SIZE (1048576UL)
#define MPT_MAX_RETIRE_JOB_NUM 10000
#define MPT_DEFAULT_TASK_RUN_TIMES 10
#define MPT_NON_POOL_ALLOC_UNIT (256 * 1048576UL)
#define MPT_NON_POOL_ALLOC_UNIT (1048576UL)
#define MPT_NON_POOL_KEEP_ALLOC_UNIT (10485760UL * 8)
#define MPT_MAX_NON_POOL_ALLOC_TIMES 30000
@ -418,6 +418,15 @@ void mptDestroyJobInfo(void* job) {
}
void mptWriteMem(void* pStart, int64_t size) {
char* pEnd = (char*)pStart + size - 1;
char* p = (char*)pStart;
while (p <= pEnd) {
*p = 'a' + taosRand() % 26;
p += 4096;
}
}
void mptInit() {
osDefaultInit();
@ -446,6 +455,9 @@ void mptInit() {
mptCtx.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0;
void* p = taosMemMalloc(1048576UL * 20000);
mptWriteMem(p, (1048576UL * 20000));
}
void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) {
@ -796,14 +808,6 @@ void mptInitPool(void) {
assert(0 == taosMemoryPoolInit(mptRetireJobsCb, mptRetireJobCb));
}
void mptWriteMem(void* pStart, int64_t size) {
char* pEnd = (char*)pStart + size - 1;
char* p = (char*)pStart;
while (p <= pEnd) {
*p = 'a' + taosRand() % 26;
p += 4096;
}
}
void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
int32_t actId = 0;
@ -1132,6 +1136,8 @@ void mptSimulateOutTask(int64_t targetSize) {
}
mptWriteMem(pCtx->p, pCtx->size);
mptCtx.npIdx++;
}
@ -1360,31 +1366,25 @@ void* mptRunThreadFunc(void* param) {
}
void* mptNonPoolThreadFunc(void* param) {
int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT * 3;
int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT;
int64_t allocSize = 0;
int32_t loopTimes = 0;
while (!atomic_load_8(&mptCtx.testDone)) {
mptSimulateOutTask(targetSize);
allocSize += targetSize;
MPT_PRINTF("%d:Non-pool malloc and write %" PRId64 " bytes, keep size:%" PRId64 "\n", loopTimes, targetSize, allocSize);
MPT_EPRINTF("%d:Non-pool malloc and write %" PRId64 " bytes, keep size:%" PRId64 "\n", mptCtx.npIdx - 1, targetSize, allocSize);
taosUsleep(1);
taosMsleep(100);
taosMemFreeClear(mptCtx.npMemList[mptCtx.npIdx].p);
if ((mptCtx.npIdx * targetSize) >= (tsMinReservedMemorySize * 1048576UL * 10)) {
for (int32_t i = 0; i < mptCtx.npIdx; ++i) {
taosMemFreeClear(mptCtx.npMemList[i].p);
}
loopTimes++;
if (0 == (loopTimes % 100)) {
mptSimulateOutTask(MPT_NON_POOL_KEEP_ALLOC_UNIT);
allocSize += MPT_NON_POOL_KEEP_ALLOC_UNIT;
mptCtx.npIdx++;
}
taosMsleep(100);
if (loopTimes >= 4000) {
mptCtx.npIdx = 0;
targetSize += MPT_NON_POOL_ALLOC_UNIT;
loopTimes = 0;
allocSize = 0;
taosMsleep(100);
}
}

View File

@ -0,0 +1,113 @@
// sample code to verify multiple queries with the same reqid
// to compile: gcc -o sameReqdiTest sameReqidTest.c -ltaos
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "taos.h"
#define CONST_REQ_ID 12345
#define TEST_DB "db1"
#define CHECK_CONDITION(condition, prompt, errstr) \
do { \
if (!(condition)) { \
printf("\033[31m[%s:%d] failed to " prompt ", reason: %s\033[0m\n", __func__, __LINE__, errstr); \
exit(EXIT_FAILURE); \
} \
} while (0)
#define CHECK_RES(res, prompt) CHECK_CONDITION(taos_errno(res) == 0, prompt, taos_errstr(res))
#define CHECK_CODE(code, prompt) CHECK_CONDITION(code == 0, prompt, taos_errstr(NULL))
int64_t errTimes = 0, finQueries = 0;
static TAOS* getNewConnection() {
const char* host = "127.0.0.1";
const char* user = "root";
const char* passwd = "taosdata";
TAOS* taos = NULL;
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
taos = taos_connect(host, user, passwd, "", 0);
CHECK_CONDITION(taos != NULL, "connect to db", taos_errstr(NULL));
return taos;
}
static int32_t printResult(TAOS_RES* res, int32_t nlimit) {
TAOS_ROW row = NULL;
TAOS_FIELD* fields = NULL;
int32_t numFields = 0;
int32_t nRows = 0;
numFields = taos_num_fields(res);
fields = taos_fetch_fields(res);
while ((nlimit-- > 0) && (row = taos_fetch_row(res))) {
nRows++;
}
return nRows;
}
void retrieveCallback(void* param, TAOS_RES* res, int32_t nrows) {
if (nrows < 0) {
taos_free_result(res);
atomic_add_fetch_64(&finQueries, 1);
atomic_add_fetch_64(&errTimes, 1);
} else if (nrows == 0) {
taos_free_result(res);
atomic_add_fetch_64(&finQueries, 1);
} else {
printResult(res, nrows);
taos_fetch_rows_a(res, retrieveCallback, param);
}
}
void selectCallback(void* param, TAOS_RES* res, int32_t code) {
if (code) {
atomic_add_fetch_64(&errTimes, 1);
taos_free_result(res);
atomic_add_fetch_64(&finQueries, 1);
return;
}
taos_fetch_rows_a(res, retrieveCallback, param);
}
static void verifyQueryAsync(TAOS* taos) {
taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL);
taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL);
taos_query_a(taos, "select twa(c1) from stb interval(10s);", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
taos_query_a(taos, "select * from stb", selectCallback, NULL);
}
int main(int argc, char* argv[]) {
TAOS* taos = NULL;
int32_t code = 0;
taos = getNewConnection();
taos_select_db(taos, TEST_DB);
CHECK_CODE(code, "switch to database");
printf("************ prepare data *************\n");
for (int64_t i = 0; i < 1000000000; ++i) {
verifyQueryAsync(taos);
printf("%llu queries launched, errTimes:%lld \n", i * 10, errTimes);
while ((i * 10 - atomic_load_64(&finQueries)) > 1000) {
printf("left queries:%llu\n", (i * 10 - atomic_load_64(&finQueries)));
taosMsleep(2000);
}
printf("\n");
}
taos_close(taos);
taos_cleanup();
return 0;
}

View File

@ -22,6 +22,7 @@ exe:
gcc $(CFLAGS) ./insert_stb.c -o $(ROOT)insert_stb $(LFLAGS)
gcc $(CFLAGS) ./tmqViewTest.c -o $(ROOT)tmqViewTest $(LFLAGS)
gcc $(CFLAGS) ./stmtQuery.c -o $(ROOT)stmtQuery $(LFLAGS)
gcc $(CFLAGS) ./asyncQuery.c -o $(ROOT)asyncQuery $(LFLAGS)
# gcc $(CFLAGS) ./stmt.c -o $(ROOT)stmt $(LFLAGS)
# gcc $(CFLAGS) ./stmt2.c -o $(ROOT)stmt2 $(LFLAGS)
# gcc $(CFLAGS) ./stmt2-example.c -o $(ROOT)stmt2-example $(LFLAGS)