feature/schduler
This commit is contained in:
parent
a702b1460a
commit
5579b50ac6
|
@ -51,6 +51,8 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" )
|
||||||
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" )
|
||||||
|
|
||||||
// message from client to mnode
|
// message from client to mnode
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
|
||||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
|
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
|
||||||
|
@ -1074,6 +1076,24 @@ typedef struct {
|
||||||
/* data */
|
/* data */
|
||||||
} SUpdateTagValRsp;
|
} SUpdateTagValRsp;
|
||||||
|
|
||||||
|
typedef struct SSchedulerQueryMsg {
|
||||||
|
uint64_t queryId;
|
||||||
|
uint64_t taskId;
|
||||||
|
uint32_t contentLen;
|
||||||
|
char msg[];
|
||||||
|
} SSchedulerQueryMsg;
|
||||||
|
|
||||||
|
typedef struct SSchedulerReadyMsg {
|
||||||
|
uint64_t queryId;
|
||||||
|
uint64_t taskId;
|
||||||
|
} SSchedulerReadyMsg;
|
||||||
|
|
||||||
|
typedef struct SSchedulerFetchMsg {
|
||||||
|
uint64_t queryId;
|
||||||
|
uint64_t taskId;
|
||||||
|
} SSchedulerFetchMsg;
|
||||||
|
|
||||||
|
|
||||||
#pragma pack(pop)
|
#pragma pack(pop)
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -78,6 +78,7 @@ typedef struct SQueryJob {
|
||||||
SEpSet *mgmtEpSet;
|
SEpSet *mgmtEpSet;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
int32_t userFetch;
|
int32_t userFetch;
|
||||||
|
int32_t remoteFetch;
|
||||||
void *res;
|
void *res;
|
||||||
|
|
||||||
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
|
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
|
||||||
|
|
|
@ -270,25 +270,158 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) {
|
int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
|
||||||
|
int32_t msgSize = 0;
|
||||||
|
void *msg = NULL;
|
||||||
|
|
||||||
|
switch (msgType) {
|
||||||
|
case TSDB_MSG_TYPE_QUERY: {
|
||||||
|
if (NULL == task->msg) {
|
||||||
|
qError("query msg is NULL");
|
||||||
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = strlen(task->msg);
|
||||||
|
msgSize = sizeof(SSchedulerQueryMsg) + len;
|
||||||
|
msg = calloc(1, msgSize);
|
||||||
|
if (NULL == msg) {
|
||||||
|
qError("calloc %d failed", msgSize);
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchedulerQueryMsg *pMsg = msg;
|
||||||
|
pMsg->queryId = job->queryId;
|
||||||
|
pMsg->taskId = task->taskId;
|
||||||
|
pMsg->contentLen = len;
|
||||||
|
memcpy(pMsg->msg, task->msg, len);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_MSG_TYPE_RSP_READY: {
|
||||||
|
msgSize = sizeof(SSchedulerReadyMsg);
|
||||||
|
msg = calloc(1, msgSize);
|
||||||
|
if (NULL == msg) {
|
||||||
|
qError("calloc %d failed", msgSize);
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchedulerReadyMsg *pMsg = msg;
|
||||||
|
pMsg->queryId = job->queryId;
|
||||||
|
pMsg->taskId = task->taskId;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case TSDB_MSG_TYPE_FETCH: {
|
||||||
|
msgSize = sizeof(SSchedulerFetchMsg);
|
||||||
|
msg = calloc(1, msgSize);
|
||||||
|
if (NULL == msg) {
|
||||||
|
qError("calloc %d failed", msgSize);
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
SSchedulerFetchMsg *pMsg = msg;
|
||||||
|
pMsg->queryId = job->queryId;
|
||||||
|
pMsg->taskId = task->taskId;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
qError("unknown msg type:%d", msgType);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
//TODO SEND MSG
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) {
|
int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schFetchFromRemote(SQueryJob *job) {
|
int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
switch (msgType) {
|
||||||
|
case TSDB_MSG_TYPE_QUERY:
|
||||||
|
if (rspCode != TSDB_CODE_SUCCESS) {
|
||||||
|
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
|
||||||
|
} else {
|
||||||
|
code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY);
|
||||||
|
if (code) {
|
||||||
|
goto _task_error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case TSDB_MSG_TYPE_RSP_READY:
|
||||||
|
if (rspCode != TSDB_CODE_SUCCESS) {
|
||||||
|
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
|
||||||
|
} else {
|
||||||
|
code = schProcessOnTaskSuccess(job, task);
|
||||||
|
if (code) {
|
||||||
|
goto _task_error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case TSDB_MSG_TYPE_FETCH:
|
||||||
|
SCH_ERR_JRET(rspCode);
|
||||||
|
SCH_ERR_JRET(schProcessOnDataFetched(job));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
qError("unknown msg type:%d received", msgType);
|
||||||
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_task_error:
|
||||||
|
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code));
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
code = schProcessOnJobFailure(job);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t schFetchFromRemote(SQueryJob *job) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) {
|
||||||
|
qInfo("prior fetching not finished");
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCH_ERR_JRET(schAsyncSendMsg(job, NULL, TSDB_MSG_TYPE_FETCH));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t schProcessOnJobSuccess(SQueryJob *job) {
|
int32_t schProcessOnJobSuccess(SQueryJob *job) {
|
||||||
|
if (job->userFetch) {
|
||||||
|
SCH_ERR_RET(schFetchFromRemote(job));
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schProcessOnJobFailure(SQueryJob *job) {
|
int32_t schProcessOnJobFailure(SQueryJob *job) {
|
||||||
|
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
|
||||||
|
|
||||||
|
if (job->userFetch) {
|
||||||
|
tsem_post(&job->rspSem);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t schProcessOnDataFetched(SQueryJob *job) {
|
||||||
|
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
|
||||||
|
|
||||||
|
tsem_post(&job->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -367,7 +500,7 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) {
|
||||||
SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet));
|
SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet));
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_RET(schAsyncLaunchTask(job, task));
|
SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY));
|
||||||
|
|
||||||
SCH_ERR_RET(schPushTaskToExecList(job, task));
|
SCH_ERR_RET(schPushTaskToExecList(job, task));
|
||||||
|
|
||||||
|
@ -450,19 +583,26 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryJob *job = pJob;
|
SQueryJob *job = pJob;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
|
if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
|
||||||
qError("prior fetching not finished");
|
qError("prior fetching not finished");
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_RET(schFetchFromRemote(job));
|
if (job->status == SCH_STATUS_SUCCEED) {
|
||||||
|
SCH_ERR_JRET(schFetchFromRemote(job));
|
||||||
|
}
|
||||||
|
|
||||||
tsem_wait(&job->rspSem);
|
tsem_wait(&job->rspSem);
|
||||||
|
|
||||||
*data = job->res;
|
*data = job->res;
|
||||||
|
job->res = NULL;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
_return:
|
||||||
|
atomic_val_compare_exchange_32(&job->userFetch, 1, 0);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t scheduleCancelJob(void *pRpc, void *pJob);
|
int32_t scheduleCancelJob(void *pRpc, void *pJob);
|
||||||
|
|
Loading…
Reference in New Issue