diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index af361323a7..9d8fad6fea 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -31,7 +31,7 @@ extern "C" { #define QW_DEFAULT_SCHEDULER_NUMBER 100 #define QW_DEFAULT_TASK_NUMBER 10000 -#define QW_DEFAULT_SCH_TASK_NUMBER 10000 +#define QW_DEFAULT_SCH_TASK_NUMBER 3000 #define QW_DEFAULT_SHORT_RUN_TIMES 2 #define QW_DEFAULT_HEARTBEAT_MSEC 5000 #define QW_SCH_TIMEOUT_MSEC 180000 @@ -247,7 +247,7 @@ typedef struct SQWorkerMgmt { #define QW_ERR_RET(c) \ do { \ - int32_t _code = (c); \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ return _code; \ @@ -255,7 +255,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_RET(c) \ do { \ - int32_t _code = (c); \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ } \ @@ -263,7 +263,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_ERR_JRET(c) \ do { \ - code = (c); \ + code = (c); \ if (code != TSDB_CODE_SUCCESS) { \ terrno = code; \ goto _return; \ diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index ce3b493638..f3d073634d 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -281,7 +281,7 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t code = 0; - + // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { @@ -463,6 +463,8 @@ void qwDestroyImpl(void *pMgmt) { int8_t nodeType = mgmt->nodeType; int32_t nodeId = mgmt->nodeId; + int32_t taskCount = 0; + int32_t schStatusCount = 0; qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); taosTmrStop(mgmt->hbTimer); @@ -472,6 +474,7 @@ void qwDestroyImpl(void *pMgmt) { uint64_t qId, tId; int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); @@ -480,6 +483,7 @@ void qwDestroyImpl(void *pMgmt) { qwFreeTaskCtx(ctx); QW_TASK_DLOG_E("task ctx freed"); pIter = taosHashIterate(mgmt->ctxHash, pIter); + taskCount++; } taosHashCleanup(mgmt->ctxHash); @@ -487,7 +491,9 @@ void qwDestroyImpl(void *pMgmt) { while (pIter) { SQWSchStatus *sch = (SQWSchStatus *)pIter; qwDestroySchStatus(sch); + pIter = taosHashIterate(mgmt->schHash, pIter); + schStatusCount++; } taosHashCleanup(mgmt->schHash); @@ -499,7 +505,8 @@ void qwDestroyImpl(void *pMgmt) { qwCloseRef(); - qDebug("qworker destroyed, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); + qDebug("qworker destroyed, type:%d, id:%d, handle:%p, taskCount:%d, schStatusCount: %d", nodeType, nodeId, mgmt, + taskCount, schStatusCount); } int32_t qwOpenRef(void) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c5db4105d7..81f73b1226 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -8,9 +8,9 @@ #include "qwMsg.h" #include "tcommon.h" #include "tdatablock.h" +#include "tglobal.h" #include "tmsg.h" #include "tname.h" -#include "tglobal.h" SQWorkerMgmt gQwMgmt = { .lock = 0, @@ -117,7 +117,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { if (queryStop) { *queryStop = true; } - + return TSDB_CODE_SUCCESS; } @@ -275,7 +275,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, QW_ERR_RET(code); } - QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %"PRId64, pOutput->numOfBlocks, + QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks, pOutput->numOfRows); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC); @@ -327,12 +327,14 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } if (0 == ctx->level) { - QW_TASK_DLOG("task fetched blocks %d rows %"PRId64", level %d", pOutput->numOfBlocks, pOutput->numOfRows, ctx->level); + QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 ", level %d", pOutput->numOfBlocks, pOutput->numOfRows, + ctx->level); break; } if (pOutput->numOfRows >= QW_MIN_RES_ROWS) { - QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, pOutput->numOfRows); + QW_TASK_DLOG("task fetched blocks %d rows %" PRId64 " reaches the min rows", pOutput->numOfBlocks, + pOutput->numOfRows); break; } } @@ -538,7 +540,7 @@ _return: SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); - if (!rsped) { + if (!rsped) { qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); } } @@ -650,8 +652,8 @@ _return: code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - void *rsp = NULL; - int32_t dataLen = 0; + void *rsp = NULL; + int32_t dataLen = 0; SOutputData sOutput = {0}; if (qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)) { return TSDB_CODE_SUCCESS; @@ -671,8 +673,8 @@ _return: qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code); rsp = NULL; - QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, - tstrerror(code), dataLen); + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), + dataLen); } } @@ -689,7 +691,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { do { ctx = NULL; - + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -748,7 +750,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } QW_LOCK(QW_WRITE, &ctx->lock); - if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { + if ((queryStop && (0 == atomic_load_8((int8_t *)&ctx->queryContinue))) || code || + 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { // Note: query is not running anymore QW_SET_PHASE(ctx, 0); QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -1176,7 +1179,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); uint64_t qId, tId; - int32_t eId; + int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; @@ -1186,7 +1189,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { QW_LOCK(QW_WRITE, &ctx->lock); QW_TASK_DLOG_E("start to force stop task"); - + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG_E("task already dropping"); QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -1194,7 +1197,7 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { pIter = taosHashIterate(mgmt->ctxHash, pIter); continue; } - + if (QW_QUERY_RUNNING(ctx)) { qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); } else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {