From 50bbb8742230ba4ea77d00936aeca9c25022a482 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 21 Jan 2022 09:17:58 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/src/qworker.c | 106 ++++++++++++---------- source/libs/qworker/test/qworkerTests.cpp | 16 +++- 2 files changed, 68 insertions(+), 54 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 1cb719c667..566356e255 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -439,52 +439,6 @@ _return: QW_RET(code); } - -int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needRsp) { - int32_t code = 0; - SQWTaskCtx *ctx = NULL; - bool locked = false; - - QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_WRITE, &ctx)); - - QW_LOCK(QW_WRITE, &ctx->lock); - - locked = true; - - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task already dropping, phase:%d", ctx->phase); - QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); - } - - if (QW_IN_EXECUTOR(ctx)) { - QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); - - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); - } else if (ctx->phase > 0) { - QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); - QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS())); - - locked = false; - *needRsp = true; - } - - if (locked) { - QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); - } - -_return: - - if (code) { - QW_SET_RSP_CODE(ctx, code); - } - - if (locked) { - QW_UNLOCK(QW_WRITE, &ctx->lock); - } - - QW_RET(code); -} - int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType) { int32_t code = 0; bool qcontinue = true; @@ -619,11 +573,12 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ int8_t status = 0; SQWTaskCtx *ctx = NULL; bool locked = false; + bool ctxAcquired = false; void *readyConnection = NULL; void *dropConnection = NULL; void *cancelConnection = NULL; - QW_SCH_TASK_DLOG("handle event at phase %d", phase); + QW_SCH_TASK_DLOG("start to handle event at phase %d", phase); switch (phase) { case QW_PHASE_PRE_QUERY: { @@ -716,6 +671,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ } case QW_PHASE_PRE_FETCH: { QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + ctxAcquired = true; QW_LOCK(QW_WRITE, &ctx->lock); @@ -816,6 +772,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ } case QW_PHASE_PRE_CQUERY: { QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + ctxAcquired = true; QW_LOCK(QW_WRITE, &ctx->lock); @@ -914,6 +871,10 @@ _return: QW_UNLOCK(QW_WRITE, &ctx->lock); } + if (ctxAcquired && ctx) { + qwReleaseTaskCtx(QW_READ, mgmt); + } + if (readyConnection) { qwBuildAndSendReadyRsp(readyConnection, output->rspCode); QW_TASK_DLOG("ready msg rsped, code:%x", output->rspCode); @@ -929,6 +890,7 @@ _return: QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode); } + QW_SCH_TASK_DLOG("end to handle event at phase %d", phase); QW_RET(code); } @@ -1036,6 +998,7 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t if (phase == QW_PHASE_PRE_QUERY) { QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); + ctx->readyConnection = qwMsg->connection; QW_TASK_DLOG("ready msg not rsped, phase:%d", phase); } else if (phase == QW_PHASE_POST_QUERY) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); @@ -1225,16 +1188,59 @@ _return: int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) { int32_t code = 0; bool needRsp = false; - - QW_ERR_JRET(qwDropTask(QW_FPARAMS(), &needRsp)); + SQWTaskCtx *ctx = NULL; + bool locked = false; + QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_WRITE, &ctx)); + + QW_LOCK(QW_WRITE, &ctx->lock); + + locked = true; + + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG("task already dropping, phase:%d", ctx->phase); + QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); + } + + if (QW_IN_EXECUTOR(ctx)) { + QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); + + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); + + ctx->dropConnection = qwMsg->connection; + } else if (ctx->phase > 0) { + QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS())); + + locked = false; + needRsp = true; + } + + if (!needRsp) { + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); + } + _return: + if (code) { + QW_SET_RSP_CODE(ctx, code); + } + + if (locked) { + QW_UNLOCK(QW_WRITE, &ctx->lock); + } + + if (ctx) { + qwReleaseTaskCtx(QW_WRITE, mgmt); + } + if (TSDB_CODE_SUCCESS != code || needRsp) { QW_ERR_RET(qwBuildAndSendDropRsp(qwMsg->connection, code)); + + QW_TASK_DLOG("drop msg rsped, code:%x", code); } - return TSDB_CODE_SUCCESS; + QW_RET(code); } int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) { diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 46d80bbd26..d1cc9f03d1 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -129,17 +129,17 @@ int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTas int32_t idx = qwtTestCaseIdx % qwtTestCaseNum; if (0 == idx) { - *pTaskInfo = qwtTestCaseIdx; - *handle = qwtTestCaseIdx+1; + *pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx; + *handle = (DataSinkHandle)qwtTestCaseIdx+1; } else if (1 == idx) { *pTaskInfo = NULL; *handle = NULL; } else if (2 == idx) { - *pTaskInfo = qwtTestCaseIdx; + *pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx; *handle = NULL; } else if (3 == idx) { *pTaskInfo = NULL; - *handle = qwtTestCaseIdx; + *handle = (DataSinkHandle)qwtTestCaseIdx; } ++qwtTestCaseIdx; @@ -460,6 +460,14 @@ void *controlThread(void *param) { return NULL; } +void *queryQueueThread(void *param) { + +} + +void *fetchQueueThread(void *param) { + +} + }