From 8b061f528539df03b3c1b2a4373fd014e282c0fe Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 17 Jan 2022 13:34:29 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/inc/qworkerInt.h | 12 +- source/libs/qworker/inc/qworkerMsg.h | 48 ++++++ source/libs/qworker/src/qworker.c | 225 ++++++++++++++------------- source/libs/qworker/src/qworkerMsg.c | 85 ++++------ 4 files changed, 197 insertions(+), 173 deletions(-) create mode 100644 source/libs/qworker/inc/qworkerMsg.h diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index c07321ea4b..426db52eb2 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -81,8 +81,10 @@ typedef struct SQWMsg { } SQWMsg; typedef struct SQWPhaseInput { - int8_t status; - int32_t code; + int8_t status; + int32_t code; + qTaskInfo_t taskHandle; + DataSinkHandle sinkHandle; } SQWPhaseInput; typedef struct SQWPhaseOutput { @@ -102,13 +104,9 @@ typedef struct SQWTaskCtx { int32_t phase; int32_t sinkId; - int8_t queryInQ; + int32_t readyCode; int8_t events[QW_EVENT_MAX]; - int8_t ready; - int8_t cancel; - int8_t drop; - int8_t needRsp; qTaskInfo_t taskHandle; DataSinkHandle sinkHandle; diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h new file mode 100644 index 0000000000..3b5f3b1605 --- /dev/null +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_QWORKER_MSG_H_ +#define _TD_QWORKER_MSG_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "qworkerInt.h" +#include "dataSinkMgt.h" + +int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); +int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); +int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); +int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); +int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg); + +int32_t qwBuildAndSendDropRsp(void *connection, int32_t code); +int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len); +int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); +int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); +int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code); +int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code); +void qwFreeFetchRsp(void *msg); +int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); + + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_QWORKER_INT_H_*/ diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ba4669fa74..71efe01bb2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -4,6 +4,7 @@ #include "planner.h" #include "query.h" #include "qworkerInt.h" +#include "qworkerMsg.h" #include "tmsg.h" #include "tname.h" #include "dataSinkMgt.h" @@ -130,7 +131,7 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, u QW_UNLOCK(rwType, &mgmt->schLock); if (QW_NOT_EXIST_ADD == nOpt) { - QW_ERR_RET(qwAddSchedulerImpl(rwType, mgmt, sId, sch)); + QW_ERR_RET(qwAddSchedulerImpl(QW_FPARAMS(), rwType, sch)); nOpt = QW_NOT_EXIST_RET_ERR; @@ -149,17 +150,34 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, u } int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) { - return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD); + return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_ADD); } int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) { - return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR); + return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_RET_ERR); } void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } + +int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); + + QW_LOCK(rwType, &sch->tasksLock); + *task = taosHashGet(sch->tasksHash, id, sizeof(id)); + if (NULL == (*task)) { + QW_UNLOCK(rwType, &sch->tasksLock); + QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST); + } + + return TSDB_CODE_SUCCESS; +} + + + int32_t qwAddTaskStatusImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) { int32_t code = 0; @@ -209,21 +227,6 @@ _return: } -int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) { - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); - - QW_LOCK(rwType, &sch->tasksLock); - *task = taosHashGet(sch->tasksHash, id, sizeof(id)); - if (NULL == (*task)) { - QW_UNLOCK(rwType, &sch->tasksLock); - QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST); - } - - return TSDB_CODE_SUCCESS; -} - - int32_t qwAddAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) { return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task); } @@ -233,14 +236,30 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); } + +int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); + + QW_LOCK(rwType, &mgmt->ctxLock); + *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); + if (NULL == (*ctx)) { + QW_UNLOCK(rwType, &mgmt->ctxLock); + QW_TASK_ELOG("ctx not in ctxHash, id:%s", id); + QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); + } + + return TSDB_CODE_SUCCESS; +} + int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, int32_t status, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); - SQWTaskCtx ctx = {0}; + SQWTaskCtx nctx = {0}; QW_LOCK(QW_WRITE, &mgmt->ctxLock); - int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &ctx, sizeof(SQWTaskCtx)); + int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx)); if (0 != code) { QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); @@ -283,20 +302,6 @@ int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI } -int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) { - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); - - QW_LOCK(rwType, &mgmt->ctxLock); - *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); - if (NULL == (*ctx)) { - QW_UNLOCK(rwType, &mgmt->ctxLock); - QW_TASK_ELOG("ctx not in ctxHash, id:%s", id); - QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST); - } - - return TSDB_CODE_SUCCESS; -} int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) { return qwAddTaskCtxImpl(QW_FPARAMS(), rwType, 0, ctx); @@ -375,7 +380,7 @@ int32_t qwDropTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - QW_TASK_DLOG("task dropped, id:%d", id); + QW_TASK_DLOG("task dropped, id:%s", id); _return: @@ -385,27 +390,13 @@ _return: QW_RET(code); } -int32_t qwUpdateTaskCtxHandles(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) { - SQWTaskCtx *ctx = NULL; - - QW_ERR_RET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); - - ctx->taskHandle = taskHandle; - ctx->sinkHandle = sinkHandle; - - qwReleaseTaskCtx(QW_READ, mgmt); - - return TSDB_CODE_SUCCESS; -} - - int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); - QW_ERR_JRET(qwAcquireTaskStatus(mgmt, QW_READ, sch, qId, tId, &task)); + QW_ERR_RET(qwAcquireScheduler(QW_FPARAMS(), QW_READ, &sch)); + QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task)); QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status)); @@ -430,7 +421,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, locked = true; if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task already dropping", NULL); + QW_TASK_WLOG("task already dropping, phase:%d", ctx->phase); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } @@ -488,7 +479,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void QW_ERR_RET(code); } - QW_TASK_DLOG("no data in sink and query end", NULL); + QW_TASK_DLOG("no data in sink and query end, phase:%d", ctx->phase); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); @@ -546,6 +537,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S SQWTaskCtx *ctx = NULL; bool locked = false; + QW_SCH_TASK_DLOG("handle event at phase %d", phase); + switch (phase) { case QW_PHASE_PRE_QUERY: { QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); @@ -587,6 +580,11 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S locked = true; + ctx->taskHandle = input->taskHandle; + ctx->sinkHandle = input->sinkHandle; + + ctx->readyCode = input->code; + assert(!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)); if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { @@ -610,14 +608,14 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) { output->needRsp = true; - + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); - output->rspCode = input.code; + output->rspCode = input->code; } if (!output->needStop) { - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input.status)); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->status)); } break; } @@ -626,36 +624,36 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S QW_LOCK(QW_WRITE, &ctx->lock); - locked = true; + locked = true; ctx->phase = phase; if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled", NULL); + QW_TASK_WLOG("task already cancelled, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task is dropping", NULL); + QW_TASK_WLOG("task is dropping, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_DROPPING; } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task is cancelling", NULL); + QW_TASK_WLOG("task is cancelling, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING; } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - QW_TASK_WLOG("last fetch not finished", NULL); + QW_TASK_WLOG("last fetch not finished, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_DUPLICATTED_OPERATION; QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) { - QW_TASK_ELOG("query rsp are not ready", NULL); + QW_TASK_ELOG("query rsp are not ready, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR; QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); @@ -670,18 +668,18 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S locked = true; if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled", NULL); + QW_TASK_WLOG("task already cancelled, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task is dropping", NULL); + QW_TASK_WLOG("task is dropping, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_DROPPING; } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task is cancelling", NULL); + QW_TASK_WLOG("task is cancelling, phase:%d", phase); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING; } @@ -722,7 +720,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t code = output.rspCode; if (needStop) { - QW_TASK_DLOG("task need stop", NULL); + QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_QUERY); QW_ERR_JRET(code); } @@ -733,13 +731,13 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo); + code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); QW_ERR_JRET(code); } - QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, TSDB_CODE_SUCCESS)); queryRsped = true; @@ -750,8 +748,6 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_ERR_JRET(code); } - QW_ERR_JRET(qwUpdateTaskCtxHandles(QW_FPARAMS(), pTaskInfo, sinkHandle)); - _return: if (code) { @@ -770,6 +766,8 @@ _return: } input.code = rspCode; + input.taskHandle = pTaskInfo; + input.sinkHandle = sinkHandle; if (TSDB_CODE_SUCCESS != rspCode) { input.status = JOB_TASK_STATUS_FAILED; @@ -786,6 +784,33 @@ _return: QW_RET(rspCode); } +int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) { + int32_t code = 0; + SQWTaskCtx *ctx = NULL; + + QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); + + QW_LOCK(QW_WRITE, &ctx->lock); + + if (ctx->phase == QW_PHASE_PRE_QUERY) { + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY); + } else if (ctx->phase == QW_PHASE_POST_QUERY) { + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); + QW_ERR_JRET(qwBuildAndSendReadyRsp(qwMsg->connection, ctx->readyCode)); + } + +_return: + + if (ctx) { + QW_UNLOCK(QW_WRITE, &ctx->lock); + + qwReleaseTaskCtx(QW_READ, mgmt); + } + + QW_RET(code); +} + + int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) { int32_t code = 0; bool queryRsped = false; @@ -804,7 +829,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t code = output.rspCode; if (needStop) { - QW_TASK_DLOG("task need stop", NULL); + QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_CQUERY); QW_ERR_JRET(code); } @@ -819,22 +844,18 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t QW_ERR_JRET(code); } - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY); - + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY); + + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); - if (NULL == rsp) { - QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); - } - // Note: schedule data sink firstly and will schedule query after it's done if (sOutput.scheduleJobNo) { - if (sOutput.scheduleJobNo > ctx.sinkId) { + if (sOutput.scheduleJobNo > ctx->sinkId) { QW_TASK_DLOG("sink need schedule, scheduleJobNo:%d", sOutput.scheduleJobNo); - ctx.sinkId = sOutput.scheduleJobNo; + ctx->sinkId = sOutput.scheduleJobNo; QW_ERR_JRET(qwBuildAndSendSchSinkMsg(QW_FPARAMS(), qwMsg->connection)); } } else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { @@ -859,14 +880,15 @@ _return: qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output); - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - + if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (code) { + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); qwFreeFetchRsp(rsp); rsp = NULL; qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); } else if (rsp) { + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); + qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); } } @@ -897,7 +919,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t code = output.rspCode; if (needStop) { - QW_TASK_DLOG("task need stop", NULL); + QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_FETCH); QW_ERR_JRET(code); } @@ -912,13 +934,13 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t // Note: schedule data sink firstly and will schedule query after it's done if (sOutput.scheduleJobNo) { - if (sOutput.scheduleJobNo > ctx.sinkId) { + if (sOutput.scheduleJobNo > ctx->sinkId) { QW_TASK_DLOG("sink need schedule, scheduleJobNo:%d", sOutput.scheduleJobNo); - ctx.sinkId = sOutput.scheduleJobNo; + ctx->sinkId = sOutput.scheduleJobNo; QW_ERR_JRET(qwBuildAndSendSchSinkMsg(QW_FPARAMS(), qwMsg->connection)); } - } else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { + } else if ((!sOutput.queryEnd) && (/* DS_BUF_LOW == sOutput.bufStatus || */ DS_BUF_EMPTY == sOutput.bufStatus)) { QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus); if (!QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY)) { @@ -1037,28 +1059,11 @@ void qWorkerDestroy(void **qWorkerMgmt) { tfree(*qWorkerMgmt); } - -#if 0 -#endif - - - - - - - - - - - - - - - int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SSchedulerStatusRsp **rsp) { SQWSchStatus *sch = NULL; int32_t taskNum = 0; +/* QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); sch->lastAccessTs = taosGetTimestampSec(); @@ -1096,6 +1101,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint qwReleaseScheduler(QW_READ, mgmt); (*rsp)->num = taskNum; +*/ return TSDB_CODE_SUCCESS; } @@ -1105,12 +1111,13 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { SQWSchStatus *sch = NULL; +/* QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); sch->lastAccessTs = taosGetTimestampSec(); qwReleaseScheduler(QW_READ, mgmt); - +*/ return TSDB_CODE_SUCCESS; } @@ -1119,7 +1126,8 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; - + +/* if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) { *taskStatus = JOB_TASK_STATUS_NULL; return TSDB_CODE_SUCCESS; @@ -1136,6 +1144,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); +*/ QW_RET(code); } @@ -1146,6 +1155,7 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI SQWTaskStatus *task = NULL; int32_t code = 0; +/* QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task)); @@ -1193,6 +1203,7 @@ _return: if (sch) { qwReleaseScheduler(QW_READ, mgmt); } +*/ QW_RET(code); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 3053a6d15d..1da2cca0e7 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -4,6 +4,7 @@ #include "planner.h" #include "query.h" #include "qworkerInt.h" +#include "qworkerMsg.h" #include "tmsg.h" #include "tname.h" #include "dataSinkMgt.h" @@ -25,7 +26,9 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { return TSDB_CODE_SUCCESS; } -void qwBuildFetchRsp(SRetrieveTableRsp *rsp, SOutputData *input, int32_t len) { +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; + rsp->useconds = htobe64(input->useconds); rsp->completed = input->queryEnd; rsp->precision = input->precision; @@ -262,48 +265,6 @@ int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, } -int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) { - SQWSchStatus *sch = NULL; - SQWTaskStatus *task = NULL; - int32_t code = 0; - - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); - - QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)); - - QW_LOCK(QW_WRITE, &task->lock); - - int8_t status = task->status; - int32_t errCode = task->code; - - if (QW_TASK_READY(status)) { - task->ready = QW_READY_RESPONSED; - - QW_UNLOCK(QW_WRITE, &task->lock); - - QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, errCode)); - - QW_SCH_TASK_DLOG("task ready responsed, status:%d", status); - } else { - task->ready = QW_READY_RECEIVED; - - QW_UNLOCK(QW_WRITE, &task->lock); - - QW_SCH_TASK_DLOG("task ready NOT responsed, status:%d", status); - } - -_return: - - if (task) { - qwReleaseTask(QW_READ, sch); - } - - qwReleaseScheduler(QW_READ, mgmt); - - QW_RET(code); -} - - int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) { SRpcMsg *pMsg = (SRpcMsg *)connection; SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); @@ -349,7 +310,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen); - QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } msg->sId = be64toh(msg->sId); @@ -373,16 +334,16 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont; bool needStop = false; SQWTaskCtx *handles = NULL; + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { QW_ELOG("invalid cquery msg, contLen:%d", pMsg->contLen); - QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); msg->taskId = be64toh(msg->taskId); - msg->contentLen = ntohl(msg->contentLen); uint64_t sId = msg->sId; uint64_t qId = msg->queryId; @@ -423,11 +384,17 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = htobe64(msg->sId); - msg->queryId = htobe64(msg->queryId); - msg->taskId = htobe64(msg->taskId); + msg->sId = be64toh(msg->sId); + msg->queryId = be64toh(msg->queryId); + msg->taskId = be64toh(msg->taskId); - QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg)); + uint64_t sId = msg->sId; + uint64_t qId = msg->queryId; + uint64_t tId = msg->taskId; + + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg}; + + QW_ERR_RET(qwProcessReady(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &qwMsg)); return TSDB_CODE_SUCCESS; } @@ -448,7 +415,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SSchedulerStatusRsp *sStatus = NULL; - QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); + //QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); _return: @@ -469,9 +436,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = htobe64(msg->sId); - msg->queryId = htobe64(msg->queryId); - msg->taskId = htobe64(msg->taskId); + msg->sId = be64toh(msg->sId); + msg->queryId = be64toh(msg->queryId); + msg->taskId = be64toh(msg->taskId); uint64_t sId = msg->sId; uint64_t qId = msg->queryId; @@ -498,7 +465,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { msg->queryId = htobe64(msg->queryId); msg->taskId = htobe64(msg->taskId); - QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); + //QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); _return: @@ -517,13 +484,13 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; if (NULL == msg || pMsg->contLen < sizeof(*msg)) { - QW_ELOG("invalid task drop msg", NULL); + QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = htobe64(msg->sId); - msg->queryId = htobe64(msg->queryId); - msg->taskId = htobe64(msg->taskId); + msg->sId = be64toh(msg->sId); + msg->queryId = be64toh(msg->queryId); + msg->taskId = be64toh(msg->taskId); uint64_t sId = msg->sId; uint64_t qId = msg->queryId;