diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 2765d7d5d7..46e46d323e 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -86,8 +86,6 @@ typedef struct SQWPhaseInput { int8_t taskStatus; int8_t taskType; int32_t code; - qTaskInfo_t taskHandle; - DataSinkHandle sinkHandle; } SQWPhaseInput; typedef struct SQWPhaseOutput { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f1fd8aa6fb..63ae1e71f0 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -456,13 +456,15 @@ _return: QW_RET(code); } -int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType, bool shortRun) { +int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t code = 0; bool qcontinue = true; SSDataBlock* pRes = NULL; uint64_t useconds = 0; int32_t i = 0; - int32_t leftRun = QW_DEFAULT_SHORT_RUN_TIMES; + int32_t execNum = 0; + qTaskInfo_t *taskHandle = &ctx->taskHandle; + DataSinkHandle sinkHandle = ctx->sinkHandle; while (true) { QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++); @@ -473,12 +475,14 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH QW_ERR_JRET(code); } + ++execNum; + if (NULL == pRes) { QW_TASK_DLOG("task query done, useconds:%"PRIu64, useconds); dsEndPut(sinkHandle, useconds); - if (TASK_TYPE_TEMP == taskType) { + if (TASK_TYPE_TEMP == ctx->taskType) { qwFreeTaskHandle(QW_FPARAMS(), taskHandle); } @@ -498,7 +502,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH break; } - if (shortRun && ((--leftRun) <= 0)) { + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) { + break; + } + + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { break; } } @@ -617,8 +625,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu case QW_PHASE_PRE_QUERY: { atomic_store_8(&ctx->phase, phase); - atomic_store_8(&ctx->taskType, input->taskType); - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_ELOG("task already cancelled/dropped at wrong phase, phase:%d", phase); @@ -842,9 +848,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp } if (QW_PHASE_POST_QUERY == phase) { - ctx->taskHandle = input->taskHandle; - ctx->sinkHandle = input->sinkHandle; - if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) { ctx->emptyRes = true; } @@ -949,8 +952,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { SQWPhaseOutput output = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; - - input.taskType = taskType; + SQWTaskCtx *ctx = NULL; QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output)); @@ -961,6 +963,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_QUERY); QW_ERR_JRET(code); } + + QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); + + atomic_store_8(&ctx->taskType, taskType); code = qStringToSubplan(qwMsg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { @@ -986,8 +992,11 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { queryRsped = true; + atomic_store_ptr(&ctx->taskHandle, pTaskInfo); + atomic_store_ptr(&ctx->sinkHandle, sinkHandle); + if (pTaskInfo && sinkHandle) { - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &pTaskInfo, sinkHandle, taskType, true)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx)); } _return: @@ -1002,8 +1011,6 @@ _return: } input.code = rspCode; - input.taskHandle = pTaskInfo; - input.sinkHandle = sinkHandle; input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED; QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output)); @@ -1095,7 +1102,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { DataSinkHandle sinkHandle = ctx->sinkHandle; - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx->taskHandle, sinkHandle, ctx->taskType, QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH))); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx)); if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 5812719c51..23a742c6ec 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -722,7 +722,7 @@ void *queryQueueThread(void *param) { while (true) { tsem_wait(&qwtTestQuerySem); - if (qwtTestStop && qwtTestQueryQueueNum <= 0) { + if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) { break; } @@ -761,7 +761,7 @@ void *queryQueueThread(void *param) { free(queryRpc); - if (qwtTestStop && qwtTestQueryQueueNum <= 0) { + if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) { break; } } @@ -779,6 +779,10 @@ void *fetchQueueThread(void *param) { while (true) { tsem_wait(&qwtTestFetchSem); + if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) { + break; + } + taosWLockLatch(&qwtTestFetchQueueLock); if (qwtTestFetchQueueNum <= 0 || qwtTestFetchQueueRIdx == qwtTestFetchQueueWIdx) { printf("Fetch queue is empty\n"); @@ -826,7 +830,7 @@ void *fetchQueueThread(void *param) { free(fetchRpc); - if (qwtTestStop && qwtTestFetchQueueNum <= 0) { + if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) { break; } } @@ -1104,10 +1108,17 @@ TEST(rcTest, shortExecshortDelay) { break; } - sleep(3); + sleep(1); + + if (qwtTestCaseFinished) { + if (qwtTestQuitThreadNum < 3) { + tsem_post(&qwtTestQuerySem); + tsem_post(&qwtTestFetchSem); + + usleep(10); + } + } - tsem_post(&qwtTestQuerySem); - usleep(10); } qwtTestQueryQueueNum = 0; @@ -1179,10 +1190,17 @@ TEST(rcTest, longExecshortDelay) { break; } - sleep(3); + sleep(1); + + if (qwtTestCaseFinished) { + if (qwtTestQuitThreadNum < 3) { + tsem_post(&qwtTestQuerySem); + tsem_post(&qwtTestFetchSem); + + usleep(10); + } + } - tsem_post(&qwtTestQuerySem); - usleep(10); } qwtTestQueryQueueNum = 0; @@ -1255,10 +1273,17 @@ TEST(rcTest, shortExeclongDelay) { break; } - sleep(3); + sleep(1); + + if (qwtTestCaseFinished) { + if (qwtTestQuitThreadNum < 3) { + tsem_post(&qwtTestQuerySem); + tsem_post(&qwtTestFetchSem); + + usleep(10); + } + } - tsem_post(&qwtTestQuerySem); - usleep(10); } qwtTestQueryQueueNum = 0;