From e105d952bcbd2bdab369ba6ae22093b302e0d1fc Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 11 Jan 2022 18:15:37 +0800 Subject: [PATCH] feature/qnode --- include/libs/executor/dataSinkMgt.h | 1 + source/client/src/clientImpl.c | 9 ++ source/libs/executor/src/dataDispatcher.c | 7 +- source/libs/executor/src/executorMain.c | 7 +- source/libs/qworker/src/qworker.c | 126 +++++++++--------- source/libs/scheduler/src/scheduler.c | 42 +++++- source/libs/scheduler/test/schedulerTests.cpp | 4 +- 7 files changed, 127 insertions(+), 69 deletions(-) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 3ac16e9b29..f13ba5f87e 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -22,6 +22,7 @@ extern "C" { #include "os.h" #include "thash.h" +#include "executor.h" #define DS_BUF_LOW 1 #define DS_BUF_FULL 2 diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 616397f8e8..727b668f3a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -547,6 +547,13 @@ void* doFetchRow(SRequestObj* pRequest) { if (pRequest->type == TDMT_VND_QUERY) { pRequest->type = TDMT_VND_FETCH; scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); + + pResultInfo->current = 0; + if (pResultInfo->numOfRows <= pResultInfo->current) { + return NULL; + } + + goto _return; } else if (pRequest->type == TDMT_MND_SHOW) { pRequest->type = TDMT_MND_SHOW_RETRIEVE; } else if (pRequest->type == TDMT_VND_SHOW_TABLES) { @@ -590,6 +597,8 @@ void* doFetchRow(SRequestObj* pRequest) { } } +_return: + for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) { pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current; if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index e4b0557bff..a69084f3db 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -109,10 +109,14 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; pEntry->compressed = (int8_t)needCompress(pInput->pData, &(pHandle->schema)); pEntry->numOfRows = pInput->pData->info.rows; + pEntry->dataLen = 0; pBuf->useSize = DATA_META_LENGTH(pInput->pTableRetrieveTsMap); copyData(pInput, &pHandle->schema, pEntry->data, pEntry->compressed, &pEntry->dataLen); - pBuf->useSize += (pEntry->compressed ? pEntry->dataLen : pHandle->schema.resultRowSize * pInput->pData->info.rows); + if (0 == pEntry->compressed) { + pEntry->dataLen = pHandle->schema.resultRowSize * pInput->pData->info.rows; + } + pBuf->useSize += pEntry->dataLen; // todo completed } @@ -213,6 +217,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS return TSDB_CODE_QRY_OUT_OF_MEMORY; } dispatcher->sink.fPut = putDataBlock; + dispatcher->sink.fEndPut = endPut; dispatcher->sink.fGetLen = getDataLength; dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fDestroy = destroyDataSinker; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 3f77e15382..50f69fb567 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -77,13 +77,13 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ goto _error; } - SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000}; + SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100}; code = dsDataSinkMgtInit(&cfg); if (code != TSDB_CODE_SUCCESS) { goto _error; } - code = dsCreateDataSinker(pSubplan->pDataSink, (*pTask)->dsHandle); + code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle); _error: // if failed to add ref for all tables in this query, abort current query @@ -178,7 +178,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); int64_t st = 0; - handle = &pTaskInfo->dsHandle; + *handle = pTaskInfo->dsHandle; while(1) { st = taosGetTimestampUs(); @@ -188,6 +188,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pRes == NULL) { // no results generated yet, abort + dsEndPut(pTaskInfo->dsHandle, pTaskInfo->cost.elapsedTime); return pTaskInfo->code; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 9401d38fc7..ad3e8073ee 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -617,6 +617,8 @@ int32_t qwInitFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + *rsp = pRsp; + return TSDB_CODE_SUCCESS; } @@ -866,7 +868,61 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI return TSDB_CODE_SUCCESS; } -int32_t qwHandleFetch(SQWorkerTaskHandlesCache *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { + +int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; + int32_t code = 0; + int8_t newStatus = JOB_TASK_STATUS_CANCELLED; + + code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD); + if (code) { + qError("sId:%"PRIx64" not in cache", sId); + QW_ERR_RET(code); + } + + code = qwAcquireTask(QW_READ, sch, qId, tId, &task); + if (code) { + qwReleaseScheduler(QW_READ, mgmt); + + if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) { + qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId); + QW_ERR_RET(code); + } + + QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task)); + } + + if (task->cancel) { + QW_LOCK(QW_WRITE, &task->lock); + qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus); + QW_UNLOCK(QW_WRITE, &task->lock); + } + + if (task->drop) { + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + qwDropTask(mgmt, sId, qId, tId); + + return TSDB_CODE_SUCCESS; + } + + if (!(task->cancel || task->drop)) { + QW_LOCK(QW_WRITE, &task->lock); + qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); + task->code = errCode; + QW_UNLOCK(QW_WRITE, &task->lock); + } + + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + return TSDB_CODE_SUCCESS; +} + + +int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -876,6 +932,9 @@ int32_t qwHandleFetch(SQWorkerTaskHandlesCache *handles, SQWorkerMgmt *mgmt, uin int32_t dataLength = 0; SRetrieveTableRsp *rsp = NULL; bool queryEnd = false; + SQWorkerTaskHandlesCache *handles = NULL; + + QW_ERR_JRET(qwAcquireTaskHandles(QW_READ, mgmt, queryId, taskId, &handles)); QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); @@ -955,59 +1014,11 @@ _return: qwBuildAndSendFetchRsp(pMsg, rsp, dataLength, code); } - QW_RET(code); -} - -int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) { - SQWSchStatus *sch = NULL; - SQWTaskStatus *task = NULL; - int32_t code = 0; - int8_t newStatus = JOB_TASK_STATUS_CANCELLED; - - code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD); - if (code) { - qError("sId:%"PRIx64" not in cache", sId); - QW_ERR_RET(code); - } - - code = qwAcquireTask(QW_READ, sch, qId, tId, &task); - if (code) { - qwReleaseScheduler(QW_READ, mgmt); - - if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) { - qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId); - QW_ERR_RET(code); - } - - QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task)); - } - - if (task->cancel) { - QW_LOCK(QW_WRITE, &task->lock); - qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus); - QW_UNLOCK(QW_WRITE, &task->lock); - } - - if (task->drop) { - qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); - - qwDropTask(mgmt, sId, qId, tId); - - return TSDB_CODE_SUCCESS; - } - - if (!(task->cancel || task->drop)) { - QW_LOCK(QW_WRITE, &task->lock); - qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); - task->code = errCode; - QW_UNLOCK(QW_WRITE, &task->lock); + if (handles) { + qwReleaseTaskResCache(QW_READ, mgmt); } - qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); - - return TSDB_CODE_SUCCESS; + QW_RET(code); } int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { @@ -1081,6 +1092,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qTaskInfo_t pTaskInfo = NULL; code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo); if (code) { + qError("qCreateExecTask failed, code:%x", code); QW_ERR_JRET(code); } else { QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL)); @@ -1094,6 +1106,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { code = qExecTask(pTaskInfo, &sinkHandle); if (code) { + qError("qExecTask failed, code:%x", code); QW_ERR_JRET(code); } else { QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle)); @@ -1225,17 +1238,10 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->sId)); void *data = NULL; - SQWorkerTaskHandlesCache *handles = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireTaskHandles(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &handles)); + QW_ERR_RET(qwHandleFetch(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg)); - QW_ERR_JRET(qwHandleFetch(handles, qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg)); - -_return: - - qwReleaseTaskResCache(QW_READ, qWorkerMgmt); - QW_RET(code); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 37e061126b..ed12408844 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -68,7 +68,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -497,6 +497,33 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { return TSDB_CODE_SUCCESS; } + +int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { + if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) { + SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); + } + + int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); + if (0 != code) { + if (HASH_NODE_EXIST(code)) { + *moved = true; + + SCH_TASK_ELOG("task already in execTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *moved = true; + + SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); + + return TSDB_CODE_SUCCESS; +} + + int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -657,7 +684,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(code); } - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); SCH_ERR_JRET(schRecordTaskSucceedNode(pTask)); @@ -689,6 +716,11 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } pJob->fetchTask = pTask; + + code = schMoveTaskToExecList(pJob, pTask, &moved); + if (code && moved) { + SCH_ERR_RET(code); + } SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob)); @@ -836,7 +868,10 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in atomic_add_fetch_32(&pJob->ref, 1); int32_t s = taosHashGetSize(pJob->execTasks); - assert(s != 0); + if (s <= 0) { + qError("QID:%"PRIx64",TID:%"PRIx64" no task in execTask list", pParam->queryId, pParam->taskId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { @@ -1383,6 +1418,7 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { if (status == JOB_TASK_STATUS_FAILED) { code = atomic_load_32(&pJob->errCode); + SCH_ERR_JRET(code); } if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index f75f0ef263..b438521234 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -217,7 +217,7 @@ void *schtSendRsp(void *param) { return NULL; } -void *pInsertJob = NULL; +struct SSchJob *pInsertJob = NULL; } @@ -228,7 +228,7 @@ TEST(queryTest, normalCase) { char *dbname = "1.db1"; char *tablename = "table1"; SVgroupInfo vgInfo = {0}; - void *pJob = NULL; + SSchJob *pJob = NULL; SQueryDag dag = {0}; schtInitLogFile();