modify scheduler api

This commit is contained in:
dapan1121 2021-12-16 17:23:17 +08:00
parent 62590e533f
commit 8146c0e0fc
3 changed files with 63 additions and 26 deletions

View File

@ -20,6 +20,8 @@
extern "C" {
#endif
#include "planner.h"
typedef struct SQueryProfileSummary {
int64_t startTs; // Object created and added into the message queue
int64_t endTs; // the timestamp when the task is completed
@ -43,43 +45,23 @@ typedef struct SQueryProfileSummary {
uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary;
typedef struct SQueryTask {
uint64_t queryId; // query id
uint64_t taskId; // task id
char *pSubplan; // operator tree
uint64_t status; // task status
SQueryProfileSummary summary; // task execution summary
void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage
} SQueryTask;
typedef struct SQueryJob {
SArray **pSubtasks;
// todo
} SQueryJob;
/**
* Process the query job, generated according to the query physical plan.
* This is a synchronized API, and is also thread-safety.
* @param pJob
* @return
*/
int32_t qProcessQueryJob(struct SQueryJob* pJob);
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob);
int32_t scheduleFetchRows(void *pJob, void *data);
/**
* The SSqlObj should not be here????
* @param pSql
* @param pVgroupId
* @param pRetVgroupId
* @return
*/
//SArray* qGetInvolvedVgroupIdList(struct SSqlObj* pSql, SArray* pVgroupId, SArray* pRetVgroupId);
/**
* Cancel query job
* @param pJob
* @return
*/
int32_t qKillQueryJob(struct SQueryJob* pJob);
int32_t scheduleCancelJob(void *pJob);
#ifdef __cplusplus
}

View File

@ -31,8 +31,22 @@ typedef struct SQuery {
int32_t currentLevel;
} SQuery;
typedef struct SQueryTask {
uint64_t queryId; // query id
uint64_t taskId; // task id
char *pSubplan; // operator tree
uint64_t status; // task status
SQueryProfileSummary summary; // task execution summary
void *pOutputHandle; // result buffer handle, to temporarily keep the output result for next stage
} SQueryTask;
typedef struct SQueryJob {
SArray **pSubtasks;
// todo
} SQueryJob;
#ifdef __cplusplus
}
#endif
#endif /*_TD_SCHEDULER_INT_H_*/
#endif /*_TD_SCHEDULER_INT_H_*/

View File

@ -13,4 +13,45 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "schedulerInt.h"
#include "schedulerInt.h"
#include "taosmsg.h"
int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_fn_t fp) {
/*
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
if (pRequest == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SRequestMsgBody body = {0};
buildConnectMsg(pRequest, &body);
int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem);
destroyConnectMsg(&body);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
printf("failed to connect to server, reason: %s\n\n", errorMsg);
destroyRequest(pRequest);
taos_close(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
destroyRequest(pRequest);
}
*/
}
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob);
int32_t scheduleFetchRows(void *pJob, void *data);
int32_t scheduleCancelJob(void *pJob);