diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index d3c91b9004..1f8c1e2931 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -42,6 +42,8 @@ enum { QW_EVENT_READY, QW_EVENT_FETCH, QW_EVENT_DROP, + QW_EVENT_SCH_SINK, + QW_EVENT_SCH_QUERY, QW_EVENT_MAX, }; @@ -100,7 +102,7 @@ typedef struct SQWTaskCtx { SRWLatch lock; int32_t phase; - int8_t sinkInQ; + int32_t sinkId; int8_t queryInQ; int8_t events[QW_EVENT_MAX]; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 03f07f3f37..5693f3c3a4 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -450,13 +450,51 @@ _return: QW_RET(code); } +int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, SRpcMsg *pMsg) { + if (atomic_load_8(&handles->sinkScheduled)) { + qDebug("data sink already scheduled"); + return TSDB_CODE_SUCCESS; + } + + SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq)); + if (NULL == req) { + qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq)); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } -int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg) { + req->header.vgId = mgmt->nodeId; + req->sId = sId; + req->queryId = qId; + req->taskId = tId; + + SRpcMsg pNewMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .msgType = TDMT_VND_SCHEDULE_DATA_SINK, + .pCont = req, + .contLen = sizeof(SSinkDataReq), + .code = 0, + }; + + int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); + if (TSDB_CODE_SUCCESS != code) { + qError("put data sink schedule msg to queue failed, code:%x", code); + rpcFreeCont(req); + QW_ERR_RET(code); + } + + qDebug("put data sink schedule msg to query queue"); + + return TSDB_CODE_SUCCESS; +} + + + +int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { int32_t len = 0; SRetrieveTableRsp *rsp = NULL; bool queryEnd = false; int32_t code = 0; - SOutputData output = {0}; dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); @@ -467,7 +505,7 @@ int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ if (len == 0) { if (queryEnd) { - code = dsGetDataBlock(ctx->sinkHandle, &output); + code = dsGetDataBlock(ctx->sinkHandle, pOutput); if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code); QW_ERR_RET(code); @@ -479,10 +517,10 @@ int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); - qwBuildFetchRsp(rsp, &output, 0); - *rspMsg = rsp; + *dataLen = 0; + return TSDB_CODE_SUCCESS; } @@ -494,42 +532,33 @@ int32_t qwGetResFromSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ // Got data from sink - - // Note: schedule data sink firstly and will schedule query after it's done - if (output.needSchedule) { - QW_TASK_DLOG("sink need schedule, queryEnd:%d", output.queryEnd); - QW_ERR_RET(qwScheduleDataSink(handles, mgmt, sId, qId, tId, pMsg)); - } else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) { - QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus); - QW_ERR_RET(qwScheduleQuery(handles, mgmt, sId, qId, tId, pMsg)); - } - + *dataLen = len; QW_TASK_DLOG("task got data in sink, dataLength:%d", len); QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + + *rspMsg = rsp; - output.pData = rsp->data; + pOutput->pData = rsp->data; - code = dsGetDataBlock(ctx->sinkHandle, &output); + code = dsGetDataBlock(ctx->sinkHandle, pOutput); if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x", code); qwFreeFetchRsp(rsp); QW_ERR_RET(code); } - queryEnd = output.queryEnd; - output.queryEnd = false; + queryEnd = pOutput->queryEnd; + pOutput->queryEnd = false; - if (DS_BUF_EMPTY == output.bufStatus && queryEnd) { - output.queryEnd = true; + if (DS_BUF_EMPTY == pOutput->bufStatus && queryEnd) { + pOutput->queryEnd = true; QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); } - qwBuildFetchRsp(rsp, &output, len); - return TSDB_CODE_SUCCESS; } @@ -783,13 +812,29 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t } QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx)); - + QW_LOCK(QW_WRITE, &ctx->lock); - + locked = true; + + SOutputData sOutput = {0}; + QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); - QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp)); + // Note: schedule data sink firstly and will schedule query after it's done + if (sOutput.needSchedule) { + QW_TASK_DLOG("sink need schedule, queryEnd:%d", sOutput.queryEnd); + if (sOutput.needSchedule > ctx.sinkId) { + QW_ERR_RET(qwScheduleDataSink(ctx, mgmt, sId, qId, tId, pMsg)); + } + } else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { + QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus); + QW_ERR_RET(qwScheduleQuery(ctx, mgmt, sId, qId, tId, pMsg)); + } + if (rsp) { + qwBuildFetchRsp(rsp, &sOutput, dataLen); + } + _return: if (locked) { @@ -800,8 +845,12 @@ _return: qwReleaseTaskCtx(QW_READ, mgmt); } - if (needRsp) { - qwBuildAndSendFetchRsp(pMsg, rsp, dataLen, code); + if (code) { + qwFreeFetchRsp(rsp); + rsp = NULL; + qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code); + } else if (rsp) { + qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code); } QW_RET(code); @@ -910,44 +959,6 @@ void qWorkerDestroy(void **qWorkerMgmt) { -int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *ctx, SRpcMsg *pMsg) { - if (atomic_load_8(&handles->sinkScheduled)) { - qDebug("data sink already scheduled"); - return TSDB_CODE_SUCCESS; - } - - SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq)); - if (NULL == req) { - qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq)); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - req->header.vgId = mgmt->nodeId; - req->sId = sId; - req->queryId = queryId; - req->taskId = taskId; - - SRpcMsg pNewMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, - .msgType = TDMT_VND_SCHEDULE_DATA_SINK, - .pCont = req, - .contLen = sizeof(SSinkDataReq), - .code = 0, - }; - - int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); - if (TSDB_CODE_SUCCESS != code) { - qError("put data sink schedule msg to queue failed, code:%x", code); - rpcFreeCont(req); - QW_ERR_RET(code); - } - - qDebug("put data sink schedule msg to query queue"); - - return TSDB_CODE_SUCCESS; -} - int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SSchedulerStatusRsp **rsp) { diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 5e58827238..7d5a1836a4 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -36,7 +36,9 @@ void qwBuildFetchRsp(SRetrieveTableRsp *rsp, SOutputData *input, int32_t len) { void qwFreeFetchRsp(void *msg) { - rpcFreeCont(msg); + if (msg) { + rpcFreeCont(msg); + } } int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { @@ -106,7 +108,9 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { +int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { + SRpcMsg *pMsg = (SRpcMsg *)connection; + if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp)); @@ -264,44 +268,6 @@ _return: } -int32_t qwScheduleDataSink(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) { - if (atomic_load_8(&handles->sinkScheduled)) { - qDebug("data sink already scheduled"); - return TSDB_CODE_SUCCESS; - } - - SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq)); - if (NULL == req) { - qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq)); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - req->header.vgId = mgmt->nodeId; - req->sId = sId; - req->queryId = queryId; - req->taskId = taskId; - - SRpcMsg pNewMsg = { - .handle = pMsg->handle, - .ahandle = pMsg->ahandle, - .msgType = TDMT_VND_SCHEDULE_DATA_SINK, - .pCont = req, - .contLen = sizeof(SSinkDataReq), - .code = 0, - }; - - int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); - if (TSDB_CODE_SUCCESS != code) { - qError("put data sink schedule msg to queue failed, code:%x", code); - rpcFreeCont(req); - QW_ERR_RET(code); - } - - qDebug("put data sink schedule msg to query queue"); - - return TSDB_CODE_SUCCESS; -} - int32_t qwScheduleQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWTaskCtx *handles, SRpcMsg *pMsg) { if (atomic_load_8(&handles->queryScheduled)) { QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled);