diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index a0f33c3763..bca11fa13f 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -47,7 +47,7 @@ extern "C" { #define QW_RETIRE_JOB_BATCH_NUM 5 -#define QW_DEFAULT_TIMEOUT_INTERVAL_SECS 3600 +#define QW_DEFAULT_TIMEOUT_INTERVAL_SECS 600 enum { QW_CONC_TASK_LEVEL_LOW = 1, @@ -266,8 +266,10 @@ typedef struct SQWRetireCtx { typedef struct SQueryExecStat { int64_t taskInitNum; + int64_t taskRunNum; int64_t taskExecDestroyNum; int64_t taskSinkDestroyNum; + int64_t taskDestroyNum; } SQueryExecStat; typedef struct SQueryMgmt { diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index d0035dd54a..68b4d41073 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -173,9 +173,10 @@ void qwDbgDumpJobsInfo(void) { return; } - qDebug("total remain job num %d, task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64, - taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum), - atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum)); + qDebug("total remain job num %d, task initNum:%" PRId64 " - %" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64 " - %" PRId64, + taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum), atomic_load_64(&gQueryMgmt.stat.taskRunNum), + atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum), + atomic_load_64(&gQueryMgmt.stat.taskDestroyNum)); size_t keyLen = 0; char* id = NULL; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 8bcf36c767..36a7452e7a 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -258,6 +258,8 @@ int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) { } } + atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1); + if (acquire && ctx) { QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx)); } else if (ctx) { @@ -425,6 +427,8 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { ctx->tbInfo = NULL; QW_TASK_DLOG_E("task ctx dropped"); + + atomic_add_fetch_64(&gQueryMgmt.stat.taskDestroyNum, 1); return code; } @@ -756,7 +760,7 @@ bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool forceStop, int32_t errC QW_TASK_DLOG("start to stop task, forceStop:%d, error:%s", forceStop, tstrerror(errCode)); - if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + if ((!forceStop) && (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP))) { QW_TASK_WLOG_E("task already dropping"); QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -867,7 +871,7 @@ void qwChkDropTimeoutQuery(SQWorker *mgmt, int32_t currTs) { void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; - if ((ctx->lastAckTs <= 0) || (currTs - ctx->lastAckTs) < tsQueryNoFetchTimeoutSec) { + if (((ctx->lastAckTs <= 0) || (currTs - ctx->lastAckTs) < tsQueryNoFetchTimeoutSec) && (!QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP))) { pIter = taosHashIterate(mgmt->ctxHash, pIter); continue; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a11cfb250a..152e1482ce 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -541,8 +541,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_LOCK(QW_WRITE, &ctx->lock); - QW_SET_PHASE(ctx, phase); - if (ctx->pJobInfo && (atomic_load_8(&ctx->pJobInfo->retired) || atomic_load_32(&ctx->pJobInfo->errCode))) { QW_TASK_ELOG("job already failed, error:%s", tstrerror(ctx->pJobInfo->errCode)); QW_ERR_JRET(ctx->pJobInfo->errCode); @@ -626,6 +624,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(ctx->rspCode); } + QW_SET_PHASE(ctx, phase); + _return: if (ctx) { @@ -635,7 +635,7 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_QRY_TASK_CTX_NOT_EXIST) { QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code)); } else { QW_TASK_DLOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code)); @@ -800,7 +800,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_APP_ERROR); } - atomic_add_fetch_64(&gQueryMgmt.stat.taskInitNum, 1); + atomic_add_fetch_64(&gQueryMgmt.stat.taskRunNum, 1); uint64_t flags = 0; dsGetSinkFlags(sinkHandle, &flags); @@ -1648,9 +1648,11 @@ void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) { } void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { - qDebug("need to retire jobs in batch, targetRetireSize:%" PRId64 ", remainJobNum:%d, task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64, - retireSize, taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum), - atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum)); + qDebug("need to retire jobs in batch, targetRetireSize:%" PRId64 ", remainJobNum:%d, task initNum:%" PRId64 " - %" PRId64 + ", task destroyNum:%" PRId64 " - %" PRId64 " - %" PRId64, + retireSize, taosHashGetSize(gQueryMgmt.pJobInfo), atomic_load_64(&gQueryMgmt.stat.taskInitNum), atomic_load_64(&gQueryMgmt.stat.taskRunNum), + atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum), + atomic_load_64(&gQueryMgmt.stat.taskDestroyNum)); SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); int32_t jobNum = 0; @@ -1683,8 +1685,9 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { taosHashCancelIterate(gQueryMgmt.pJobInfo, pJob); qDebug("job retire in batch done, [prev:%d, curr:%d, total:%d] jobs, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64 - ", task initNum:%" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64, + ", task initNum:%" PRId64 " - %" PRId64 ", task destroyNum:%" PRId64 " - %" PRId64 " - %" PRId64, alreadyJobNum, jobNum, taosHashGetSize(gQueryMgmt.pJobInfo), retiredSize, retireSize, - atomic_load_64(&gQueryMgmt.stat.taskInitNum), - atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum)); + atomic_load_64(&gQueryMgmt.stat.taskInitNum), atomic_load_64(&gQueryMgmt.stat.taskRunNum), + atomic_load_64(&gQueryMgmt.stat.taskExecDestroyNum), atomic_load_64(&gQueryMgmt.stat.taskSinkDestroyNum), + atomic_load_64(&gQueryMgmt.stat.taskDestroyNum)); } diff --git a/source/os/src/osString.c b/source/os/src/osString.c index 0155de0695..555e297d23 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -118,7 +118,7 @@ char *taosStrndupi(const char *s, int64_t size) { char *tstrndup(const char *str, int64_t size) { #ifdef WINDOWS - return strndup(str, size); + return taosStrndupi(str, size); #else char* p = strndup(str, size); if (str != NULL && NULL == p) {