From 8146c0e0fcb9dfb8ef13259762df5f09614fc4c9 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 16 Dec 2021 17:23:17 +0800 Subject: [PATCH] modify scheduler api --- include/libs/scheduler/scheduler.h | 30 ++++------------- source/libs/scheduler/inc/schedulerInt.h | 16 ++++++++- source/libs/scheduler/src/scheduler.c | 43 +++++++++++++++++++++++- 3 files changed, 63 insertions(+), 26 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 6b3c9ed021..fe22d33086 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "planner.h" + typedef struct SQueryProfileSummary { int64_t startTs; // Object created and added into the message queue int64_t endTs; // the timestamp when the task is completed @@ -43,43 +45,23 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; -typedef struct SQueryTask { - uint64_t queryId; // query id - uint64_t taskId; // task id - char *pSubplan; // operator tree - uint64_t status; // task status - SQueryProfileSummary summary; // task execution summary - void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage -} SQueryTask; - -typedef struct SQueryJob { - SArray **pSubtasks; - // todo -} SQueryJob; - /** * Process the query job, generated according to the query physical plan. * This is a synchronized API, and is also thread-safety. * @param pJob * @return */ -int32_t qProcessQueryJob(struct SQueryJob* pJob); +int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); + +int32_t scheduleFetchRows(void *pJob, void *data); -/** - * The SSqlObj should not be here???? - * @param pSql - * @param pVgroupId - * @param pRetVgroupId - * @return - */ -//SArray* qGetInvolvedVgroupIdList(struct SSqlObj* pSql, SArray* pVgroupId, SArray* pRetVgroupId); /** * Cancel query job * @param pJob * @return */ -int32_t qKillQueryJob(struct SQueryJob* pJob); +int32_t scheduleCancelJob(void *pJob); #ifdef __cplusplus } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index b1b128e200..1648cbfc98 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -31,8 +31,22 @@ typedef struct SQuery { int32_t currentLevel; } SQuery; +typedef struct SQueryTask { + uint64_t queryId; // query id + uint64_t taskId; // task id + char *pSubplan; // operator tree + uint64_t status; // task status + SQueryProfileSummary summary; // task execution summary + void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage +} SQueryTask; + +typedef struct SQueryJob { + SArray **pSubtasks; + // todo +} SQueryJob; + #ifdef __cplusplus } #endif -#endif /*_TD_SCHEDULER_INT_H_*/ \ No newline at end of file +#endif /*_TD_SCHEDULER_INT_H_*/ diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 37f6240f9b..66fd0aa4f3 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -13,4 +13,45 @@ * along with this program. If not, see . */ -#include "schedulerInt.h" \ No newline at end of file +#include "schedulerInt.h" +#include "taosmsg.h" + + +int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) { +/* + SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT); + if (pRequest == NULL) { + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + SRequestMsgBody body = {0}; + buildConnectMsg(pRequest, &body); + + int64_t transporterId = 0; + sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId); + + tsem_wait(&pRequest->body.rspSem); + destroyConnectMsg(&body); + + if (pRequest->code != TSDB_CODE_SUCCESS) { + const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno); + printf("failed to connect to server, reason: %s\n\n", errorMsg); + + destroyRequest(pRequest); + taos_close(pTscObj); + pTscObj = NULL; + } else { + tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter); + destroyRequest(pRequest); + } +*/ +} + + +int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); + +int32_t scheduleFetchRows(void *pJob, void *data); + +int32_t scheduleCancelJob(void *pJob); + +