fix: retire job issue

This commit is contained in:
dapan1121 2024-11-26 16:46:33 +08:00
parent 80b1ab8f45
commit 3d3956efda
7 changed files with 82 additions and 51 deletions

View File

@ -530,8 +530,8 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
int32_t qwInitQueryPool(void);
void qwDestroyJobInfo(void* job);
void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwRetireJob(SQWJobInfo* pJob);
bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
bool qwRetireJob(SQWJobInfo* pJob);
void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session);
int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession);
void qwFreeTaskHandle(SQWTaskCtx *ctx);

View File

@ -739,8 +739,9 @@ void qwDestroyJobInfo(void* job) {
pJob->pSessions = NULL;
}
void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t code = TSDB_CODE_SUCCESS;
bool taskFreed = false;
QW_LOCK(QW_WRITE, &ctx->lock);
@ -750,7 +751,7 @@ void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
return;
return taskFreed;
}
if (QW_QUERY_RUNNING(ctx)) {
@ -770,37 +771,47 @@ void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
QW_TASK_ELOG("task drop failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task dropped");
taskFreed = true;
}
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
return taskFreed;
}
void qwRetireTask(QW_FPARAMS_DEF) {
bool qwRetireTask(QW_FPARAMS_DEF) {
SQWTaskCtx *ctx = NULL;
int32_t code = qwAcquireTaskCtx(QW_FPARAMS(), &ctx);
if (TSDB_CODE_SUCCESS != code) {
return;
return false;
}
qwStopTask(QW_FPARAMS(), ctx);
bool retired = qwStopTask(QW_FPARAMS(), ctx);
qwReleaseTaskCtx(mgmt, ctx);
return retired;
}
void qwRetireJob(SQWJobInfo* pJob) {
bool qwRetireJob(SQWJobInfo* pJob) {
if (NULL == pJob) {
return;
return false;
}
bool retired = true;
void* pIter = taosHashIterate(pJob->pSessions, NULL);
while (pIter) {
SQWSessionInfo* pSession = (SQWSessionInfo*)pIter;
qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId);
if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId)) {
retired = false;
}
pIter = taosHashIterate(pJob->pSessions, pIter);
}
return retired;
}

View File

@ -35,7 +35,7 @@ void qwStopAllTasks(SQWorker *mgmt) {
sId = ctx->sId;
qwStopTask(QW_FPARAMS(), ctx);
(void)qwStopTask(QW_FPARAMS(), ctx);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
@ -1633,7 +1633,7 @@ void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) {
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64,
jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
qwRetireJob(pJob);
(void)qwRetireJob(pJob);
} else {
qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
}
@ -1641,6 +1641,7 @@ void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) {
void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
int32_t jobNum = 0;
int64_t retiredSize = 0;
while (retiredSize < retireSize && NULL != pJob) {
if (atomic_load_8(&pJob->retired)) {
@ -1650,17 +1651,21 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
bool retired = qwRetireJob(pJob);
if (retired) {
retiredSize += aSize;
}
jobNum++;
qwRetireJob(pJob);
retiredSize += aSize;
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job retired in batch, usedSize:%" PRId64 ", retireSize:%" PRId64,
pJob->memInfo->jobId, pJob->memInfo->clientId, aSize, retireSize);
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job mark retired in batch, retired:%d, usedSize:%" PRId64 ", retireSize:%" PRId64,
pJob->memInfo->jobId, pJob->memInfo->clientId, retired, aSize, retireSize);
}
pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);
}
qDebug("total %d jobs mark retired, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize);
}

View File

@ -229,7 +229,7 @@ typedef struct SMPSessionChunk {
} SMPSessionChunk;
typedef struct SMPSession {
SMPListNode list;
//SMPListNode list;
char* sessionId;
SMPJob* pJob;
@ -237,7 +237,7 @@ typedef struct SMPSession {
int64_t allocMemSize;
int64_t maxAllocMemSize;
SMPSessionChunk chunk;
//SMPSessionChunk chunk;
SMPStatInfo stat;
} SMPSession;

View File

@ -19,7 +19,7 @@
#include "tlog.h"
#include "tutil.h"
#if 0
int32_t mpChunkNew(SMemPool* pPool, SMPChunk** ppChunk) {
SMPChunk* pChunk = NULL;
MP_ERR_RET(mpPopIdleNode(pPool, &pPool->chunk.chunkCache, (void**)&pChunk));
@ -315,4 +315,5 @@ int32_t mpChunkUpdateCfg(SMemPool* pPool) {
return TSDB_CODE_SUCCESS;
}
#endif

View File

@ -30,7 +30,7 @@ SMemPoolMgmt gMPMgmt = {0};
SMPStrategyFp gMPFps[] = {
{NULL},
{NULL, mpDirectFullAlloc, mpDirectFullFree, mpDirectGetMemSize, mpDirectFullRealloc, NULL, NULL, mpDirectTrim},
{mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL}
//{mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL}
};
@ -242,7 +242,9 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) {
MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg));
}
MP_ERR_RET(mpInitPosStat(&pPool->stat.posStat, false));
if (tsMemPoolFullFunc) {
MP_ERR_RET(mpInitPosStat(&pPool->stat.posStat, false));
}
return TSDB_CODE_SUCCESS;
}
@ -943,6 +945,10 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt
}
void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) {
if (0 == tsMemPoolFullFunc) {
return;
}
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
SMPCtrlInfo* pCtrl = NULL;
@ -1115,7 +1121,7 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
SMPSession* pSession = (SMPSession*)session;
char detailName[128];
if (NULL != pSession) {
if (NULL != pSession && MP_GET_FLAG(pSession->ctrl.funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session");
detailName[sizeof(detailName) - 1] = 0;
mpPrintStatDetail(&pSession->ctrl, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize);
@ -1125,15 +1131,15 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
mpPrintPosStat(&pSession->ctrl, &pSession->stat.posStat, detailName);
}
if (NULL != pPool) {
if (NULL != pPool && MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_PRINT_STAT)) {
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name);
detailName[sizeof(detailName) - 1] = 0;
mpPrintSessionStat(&gMPMgmt.ctrl, &pPool->stat.statSession, detailName);
mpPrintStatDetail(&gMPMgmt.ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
detailName[sizeof(detailName) - 1] = 0;
mpPrintNodeStat(&gMPMgmt.ctrl, pPool->stat.nodeStat, detailName);
//snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode");
//detailName[sizeof(detailName) - 1] = 0;
//mpPrintNodeStat(&gMPMgmt.ctrl, pPool->stat.nodeStat, detailName);
snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos");
detailName[sizeof(detailName) - 1] = 0;
@ -1200,14 +1206,13 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) {
uWarn("null pointer of poolHandle %p or session %p", poolHandle, session);
return;
}
if (tsMemPoolFullFunc) {
(void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1);
mpCheckStatDetail(pPool, pSession, "DestroySession");
mpDestroyPosStat(&pSession->stat.posStat);
}
//TODO;
(void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1);
mpCheckStatDetail(pPool, pSession, "DestroySession");
mpDestroyPosStat(&pSession->stat.posStat);
taosMemFreeClear(pSession->sessionId);
TAOS_MEMSET(pSession, 0, sizeof(*pSession));
@ -1233,7 +1238,9 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, c
MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession));
}
MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true));
if (tsMemPoolFullFunc) {
MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true));
}
pSession->pJob = (SMPJob*)pJob;
@ -1526,10 +1533,11 @@ void taosMemPoolClose(void* poolHandle) {
SMemPool* pPool = (SMemPool*)poolHandle;
mpCheckStatDetail(pPool, NULL, "PoolClose");
mpDestroyPosStat(&pPool->stat.posStat);
if (tsMemPoolFullFunc) {
mpCheckStatDetail(pPool, NULL, "PoolClose");
mpDestroyPosStat(&pPool->stat.posStat);
}
taosMemoryFree(pPool->name);
mpDestroyCacheGroup(&pPool->sessionCache);

View File

@ -57,7 +57,7 @@ namespace {
#define MPT_MAX_RETIRE_JOB_NUM 10000
#define MPT_DEFAULT_TASK_RUN_TIMES 10
#define MPT_NON_POOL_ALLOC_UNIT (256 * 1048576UL)
#define MPT_NON_POOL_KEEP_ALLOC_UNIT 10485760UL
#define MPT_NON_POOL_KEEP_ALLOC_UNIT (10485760UL * 8)
#define MPT_MAX_NON_POOL_ALLOC_TIMES 30000
enum {
@ -685,21 +685,25 @@ int32_t mptResetJob(SMPTestJobCtx* pJobCtx) {
return 0;
}
void mptRetireJob(SMPTJobInfo* pJob) {
bool mptRetireJob(SMPTJobInfo* pJob) {
SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx;
if (MPT_TRY_LOCK(MPT_WRITE, &pCtx->jobExecLock)) {
return;
return false;
}
bool retired = false;
int32_t taskRunning = atomic_load_32(&pCtx->taskRunningNum);
if (0 == taskRunning) {
mptDestroyJob(pCtx, false);
retired = true;
} else {
uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pCtx->jobId, taskRunning);
}
MPT_UNLOCK(MPT_WRITE, &pCtx->jobExecLock);
return retired;
}
int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
@ -750,13 +754,15 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
atomic_add_fetch_64(&mptCtx.runStat.retireNum, 1);
mptRetireJob(pJob);
retiredSize += aSize;
bool retired = mptRetireJob(pJob);
if (retired) {
retiredSize += aSize;
}
jobNum++;
uDebug("QID:0x%" PRIx64 " job retired cause of limit reached, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64,
jobId, aSize, retireSize, retiredSize);
uDebug("QID:0x%" PRIx64 " job mark retired cause of limit reached, retired:%d, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64,
jobId, retired, aSize, retireSize, retiredSize);
}
pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob);
@ -764,7 +770,7 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
taosHashCancelIterate(mptCtx.pJobs, pJob);
uDebug("total %d jobs retired, retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize);
uDebug("total %d jobs mark retired, retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize);
}
@ -1575,7 +1581,7 @@ TEST(PerfTest, GetSysAvail) {
for (int32_t i = 0; i < loopTimes; ++i) {
code = taosGetSysAvailMemory(&freeSize);
assert(0 == code);
taosMsleep(5);
//taosMsleep(10);
}
totalUs = taosGetTimestampUs() - st;
@ -1915,7 +1921,7 @@ TEST(poolFuncTest, SingleThreadTest) {
}
#endif
#if 1
#if 0
TEST(poolFuncTest, MultiThreadTest) {
char* caseName = "poolFuncTest:MultiThreadTest";
SMPTestParam param = {0};