feature/qnode
This commit is contained in:
parent
3edb51d1bc
commit
896df2bdc6
|
@ -87,13 +87,14 @@ typedef struct SUseDbOutput {
|
|||
SDBVgroupInfo dbVgroup;
|
||||
} SUseDbOutput;
|
||||
|
||||
typedef enum {
|
||||
enum {
|
||||
META_TYPE_NON_TABLE = 1,
|
||||
META_TYPE_CTABLE,
|
||||
META_TYPE_TABLE,
|
||||
META_TYPE_BOTH_TABLE,
|
||||
META_TYPE_BOTH_TABLE
|
||||
};
|
||||
|
||||
|
||||
typedef struct STableMetaOutput {
|
||||
int32_t metaType;
|
||||
char ctbFname[TSDB_TABLE_FNAME_LEN];
|
||||
|
|
|
@ -97,7 +97,7 @@ typedef struct SSchJob {
|
|||
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
|
||||
|
||||
int8_t status;
|
||||
SEpAddr resEp;
|
||||
SQueryNodeAddr resNode;
|
||||
tsem_t rspSem;
|
||||
int32_t userFetch;
|
||||
int32_t remoteFetch;
|
||||
|
@ -109,7 +109,7 @@ typedef struct SSchJob {
|
|||
} SSchJob;
|
||||
|
||||
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
|
||||
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
|
||||
#define SCH_TASK_READY_TO_LUNCH(task) (atomic_load_32(&(task)->childReady) >= taosArrayGetSize((task)->children))
|
||||
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
|
||||
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
|
||||
|
||||
|
@ -130,8 +130,8 @@ typedef struct SSchJob {
|
|||
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
|
||||
|
||||
|
||||
extern int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
||||
extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
|
||||
static int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
||||
static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
static SSchedulerMgmt schMgmt = {0};
|
||||
|
||||
static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
||||
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
||||
for (int32_t i = 0; i < pJob->levelNum; ++i) {
|
||||
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
||||
|
||||
|
@ -31,6 +31,11 @@ static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0;
|
||||
|
||||
if (childNum > 0) {
|
||||
if (pJob->levelIdx == pLevel->level) {
|
||||
SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
pTask->children = taosArrayInit(childNum, POINTER_BYTES);
|
||||
if (NULL == pTask->children) {
|
||||
SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
|
||||
|
@ -53,6 +58,11 @@ static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
}
|
||||
|
||||
if (parentNum > 0) {
|
||||
if (0 == pLevel->level) {
|
||||
SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
|
||||
if (NULL == pTask->parents) {
|
||||
SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
|
||||
|
@ -84,19 +94,10 @@ static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
|
||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
|
||||
|
||||
if (pTask->parents && taosArrayGetSize(pTask->parents) > 0) {
|
||||
SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", (int32_t)taosArrayGetSize(pTask->parents));
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) {
|
||||
int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) {
|
||||
pTask->plan = pPlan;
|
||||
pTask->level = pLevel;
|
||||
pTask->status = JOB_TASK_STATUS_NOT_START;
|
||||
|
@ -105,11 +106,11 @@ static int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSch
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void schFreeTask(SSchTask* pTask) {
|
||||
void schFreeTask(SSchTask* pTask) {
|
||||
taosArrayDestroy(pTask->candidateAddrs);
|
||||
}
|
||||
|
||||
static int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
|
||||
int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
|
||||
int32_t code = 0;
|
||||
|
||||
pJob->queryId = pDag->queryId;
|
||||
|
@ -217,41 +218,48 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
static int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
|
||||
if (task->candidateAddrs) {
|
||||
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (NULL != pTask->candidateAddrs) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
task->candidateIdx = 0;
|
||||
task->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
|
||||
if (NULL == task->candidateAddrs) {
|
||||
qError("taosArrayInit failed");
|
||||
pTask->candidateIdx = 0;
|
||||
pTask->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
|
||||
if (NULL == pTask->candidateAddrs) {
|
||||
SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CONDIDATE_EP_NUM);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (task->plan->execNode.numOfEps > 0) {
|
||||
if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
|
||||
qError("taosArrayPush failed");
|
||||
if (pTask->plan->execNode.numOfEps > 0) {
|
||||
if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
|
||||
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("use execNode from plan as candidate addr");
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t addNum = 0;
|
||||
int32_t nodeNum = taosArrayGetSize(job->nodeList);
|
||||
int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
|
||||
|
||||
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
||||
SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i);
|
||||
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
|
||||
|
||||
if (NULL == taosArrayPush(task->candidateAddrs, &task->plan->execNode)) {
|
||||
qError("taosArrayPush failed");
|
||||
if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
|
||||
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
++addNum;
|
||||
}
|
||||
|
||||
if (addNum <= 0) {
|
||||
SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
/*
|
||||
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
||||
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
|
||||
|
@ -264,7 +272,7 @@ static int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
|
||||
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
|
||||
qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -275,7 +283,7 @@ static int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
|
||||
int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
|
||||
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
|
||||
qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -291,7 +299,7 @@ static int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
|
||||
int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
|
||||
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
|
||||
qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId);
|
||||
}
|
||||
|
@ -306,7 +314,7 @@ static int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
|
||||
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
|
||||
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
|
||||
// TODO if needRetry, set task retry info
|
||||
|
||||
|
@ -316,7 +324,7 @@ static int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t err
|
|||
}
|
||||
|
||||
|
||||
static int32_t schFetchFromRemote(SSchJob *job) {
|
||||
int32_t schFetchFromRemote(SSchJob *job) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) {
|
||||
|
@ -335,7 +343,7 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
static int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
|
||||
int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
|
||||
job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
|
||||
|
||||
bool needFetch = job->userFetch;
|
||||
|
@ -350,7 +358,7 @@ static int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
|
||||
int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
|
||||
job->status = JOB_TASK_STATUS_FAILED;
|
||||
job->errCode = errCode;
|
||||
|
||||
|
@ -363,57 +371,58 @@ static int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schProcessOnDataFetched(SSchJob *job) {
|
||||
int32_t schProcessOnDataFetched(SSchJob *job) {
|
||||
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
|
||||
|
||||
tsem_post(&job->rspSem);
|
||||
}
|
||||
|
||||
|
||||
static int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
|
||||
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||
bool moved = false;
|
||||
|
||||
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
|
||||
SCH_ERR_RET(schMoveTaskToSuccList(pJob, pTask, &moved));
|
||||
if (!moved) {
|
||||
SCH_TASK_ELOG(" task may already moved, status:%d", task->status);
|
||||
SCH_TASK_ELOG(" task may already moved, status:%d", pTask->status);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
task->status = JOB_TASK_STATUS_SUCCEED;
|
||||
pTask->status = JOB_TASK_STATUS_SUCCEED;
|
||||
|
||||
int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0;
|
||||
int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0;
|
||||
if (parentNum == 0) {
|
||||
if (task->plan->level != 0) {
|
||||
qError("level error");
|
||||
if (pTask->level->level != 0) {
|
||||
SCH_TASK_ELOG("no parent task level error, level:%d", pTask->level->level);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
int32_t taskDone = 0;
|
||||
|
||||
if (SCH_TASK_NEED_WAIT_ALL(task)) {
|
||||
SCH_LOCK(SCH_WRITE, &task->level->lock);
|
||||
task->level->taskSucceed++;
|
||||
taskDone = task->level->taskSucceed + task->level->taskFailed;
|
||||
SCH_UNLOCK(SCH_WRITE, &task->level->lock);
|
||||
if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
|
||||
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
|
||||
pTask->level->taskSucceed++;
|
||||
taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
|
||||
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
|
||||
|
||||
if (taskDone < task->level->taskNum) {
|
||||
qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
|
||||
if (taskDone < pTask->level->taskNum) {
|
||||
SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (taskDone > pTask->level->taskNum) {
|
||||
assert(0);
|
||||
}
|
||||
|
||||
if (task->level->taskFailed > 0) {
|
||||
job->status = JOB_TASK_STATUS_FAILED;
|
||||
SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR));
|
||||
if (pTask->level->taskFailed > 0) {
|
||||
pJob->status = JOB_TASK_STATUS_FAILED;
|
||||
SCH_ERR_RET(schProcessOnJobFailure(pJob, TSDB_CODE_QRY_APP_ERROR));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
strncpy(job->resEp.fqdn, task->execAddr.epAddr[task->execAddr.inUse].fqdn, sizeof(job->resEp.fqdn));
|
||||
job->resEp.port = task->execAddr.epAddr[task->execAddr.inUse].port;
|
||||
pJob->resNode = pTask->execAddr;
|
||||
}
|
||||
|
||||
job->fetchTask = task;
|
||||
SCH_ERR_RET(schProcessOnJobPartialSuccess(job));
|
||||
pJob->fetchTask = pTask;
|
||||
SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -428,58 +437,58 @@ static int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
|
|||
*/
|
||||
|
||||
for (int32_t i = 0; i < parentNum; ++i) {
|
||||
SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
|
||||
SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
||||
|
||||
++par->childReady;
|
||||
atomic_add_fetch_32(&par->childReady, 1);
|
||||
|
||||
SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr));
|
||||
SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr));
|
||||
|
||||
if (SCH_TASK_READY_TO_LUNCH(par)) {
|
||||
SCH_ERR_RET(schLaunchTask(job, par));
|
||||
SCH_ERR_RET(schLaunchTask(pJob, par));
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
|
||||
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
|
||||
bool needRetry = false;
|
||||
bool moved = false;
|
||||
int32_t taskDone = 0;
|
||||
SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
|
||||
SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
|
||||
|
||||
if (!needRetry) {
|
||||
SCH_TASK_ELOG("task failed[%x], no more retry", errCode);
|
||||
|
||||
SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
|
||||
SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved));
|
||||
if (!moved) {
|
||||
SCH_TASK_ELOG("task may already moved, status:%d", task->status);
|
||||
SCH_TASK_ELOG("task may already moved, status:%d", pTask->status);
|
||||
}
|
||||
|
||||
if (SCH_TASK_NEED_WAIT_ALL(task)) {
|
||||
SCH_LOCK(SCH_WRITE, &task->level->lock);
|
||||
task->level->taskFailed++;
|
||||
taskDone = task->level->taskSucceed + task->level->taskFailed;
|
||||
SCH_UNLOCK(SCH_WRITE, &task->level->lock);
|
||||
if (SCH_TASK_NEED_WAIT_ALL(pTask)) {
|
||||
SCH_LOCK(SCH_WRITE, &pTask->level->lock);
|
||||
pTask->level->taskFailed++;
|
||||
taskDone = pTask->level->taskSucceed + pTask->level->taskFailed;
|
||||
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
|
||||
|
||||
if (taskDone < task->level->taskNum) {
|
||||
qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
|
||||
if (taskDone < pTask->level->taskNum) {
|
||||
qDebug("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
job->status = JOB_TASK_STATUS_FAILED;
|
||||
SCH_ERR_RET(schProcessOnJobFailure(job, errCode));
|
||||
pJob->status = JOB_TASK_STATUS_FAILED;
|
||||
SCH_ERR_RET(schProcessOnJobFailure(pJob, errCode));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schLaunchTask(job, task));
|
||||
SCH_ERR_RET(schLaunchTask(pJob, pTask));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
|
||||
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
||||
switch (msgType) {
|
||||
|
@ -567,7 +576,7 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
static int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
|
||||
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
|
||||
|
||||
|
@ -593,32 +602,32 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
static int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
|
||||
}
|
||||
|
||||
static int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
|
||||
}
|
||||
|
||||
static int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
|
||||
}
|
||||
|
||||
static int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
|
||||
}
|
||||
|
||||
static int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
|
||||
}
|
||||
|
||||
static int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
|
||||
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
|
||||
}
|
||||
|
||||
static int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
||||
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
||||
switch (msgType) {
|
||||
case TDMT_VND_CREATE_TABLE:
|
||||
*fp = schHandleCreateTableCallback;
|
||||
|
@ -647,7 +656,7 @@ static int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
|||
}
|
||||
|
||||
|
||||
static int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
|
||||
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
|
||||
int32_t code = 0;
|
||||
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == pMsgSendInfo) {
|
||||
|
@ -686,7 +695,7 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
static void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
|
||||
void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
|
||||
epSet->inUse = addr->inUse;
|
||||
epSet->numOfEps = addr->numOfEps;
|
||||
|
||||
|
@ -697,7 +706,7 @@ static void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
|
|||
}
|
||||
|
||||
|
||||
static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
||||
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
||||
uint32_t msgSize = 0;
|
||||
void *msg = NULL;
|
||||
int32_t code = 0;
|
||||
|
@ -810,37 +819,40 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
static int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
|
||||
SSubplan *plan = task->plan;
|
||||
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
|
||||
SCH_ERR_RET(schSetTaskCandidateAddrs(job, task));
|
||||
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
||||
SSubplan *plan = pTask->plan;
|
||||
|
||||
SCH_ERR_RET(qSubPlanToString(plan, &pTask->msg, &pTask->msgLen));
|
||||
SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask));
|
||||
|
||||
if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) {
|
||||
SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, task->taskId);
|
||||
if (NULL == pTask->candidateAddrs || taosArrayGetSize(pTask->candidateAddrs) <= 0) {
|
||||
SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, pTask->taskId);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
// NOTE: race condition: the task should be put into the hash table before send msg to server
|
||||
SCH_ERR_RET(schPushTaskToExecList(job, task));
|
||||
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
|
||||
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
|
||||
SCH_ERR_RET(schBuildAndSendMsg(pJob, pTask, plan->msgType));
|
||||
|
||||
task->status = JOB_TASK_STATUS_EXECUTING;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t schLaunchJob(SSchJob *job) {
|
||||
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
|
||||
for (int32_t i = 0; i < level->taskNum; ++i) {
|
||||
SSchTask *task = taosArrayGet(level->subTasks, i);
|
||||
SCH_ERR_RET(schLaunchTask(job, task));
|
||||
}
|
||||
|
||||
job->status = JOB_TASK_STATUS_EXECUTING;
|
||||
pTask->status = JOB_TASK_STATUS_EXECUTING;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void schDropJobAllTasks(SSchJob *job) {
|
||||
int32_t schLaunchJob(SSchJob *pJob) {
|
||||
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
|
||||
|
||||
for (int32_t i = 0; i < level->taskNum; ++i) {
|
||||
SSchTask *pTask = taosArrayGet(level->subTasks, i);
|
||||
SCH_ERR_RET(schLaunchTask(pJob, pTask));
|
||||
}
|
||||
|
||||
pJob->status = JOB_TASK_STATUS_EXECUTING;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void schDropJobAllTasks(SSchJob *job) {
|
||||
void *pIter = taosHashIterate(job->succTasks, NULL);
|
||||
while (pIter) {
|
||||
SSchTask *task = *(SSchTask **)pIter;
|
||||
|
@ -868,7 +880,7 @@ static void schDropJobAllTasks(SSchJob *job) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) {
|
||||
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) {
|
||||
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
|
||||
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
|
||||
}
|
||||
|
@ -876,7 +888,7 @@ static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag
|
|||
int32_t code = 0;
|
||||
SSchJob *pJob = calloc(1, sizeof(SSchJob));
|
||||
if (NULL == pJob) {
|
||||
qError("QID:%"PRIx64" calloc %d failed", sizeof(SSchJob));
|
||||
qError("QID:%"PRIx64" calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -909,23 +921,27 @@ static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag
|
|||
code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES);
|
||||
if (0 != code) {
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
SCH_JOB_ELOG("job already exist, type:%d", pJob->attr.queryJob);
|
||||
SCH_JOB_ELOG("job already exist, isQueryJob:%d", pJob->attr.queryJob);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
} else {
|
||||
qError("taosHashPut queryId:0x%"PRIx64" failed", pJob->queryId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
SCH_JOB_ELOG("taosHashPut job failed, errno:%d", errno);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
pJob->status = JOB_TASK_STATUS_NOT_START;
|
||||
|
||||
SCH_ERR_JRET(schLaunchJob(pJob));
|
||||
|
||||
*(SSchJob **)job = pJob;
|
||||
|
||||
|
||||
if (syncSchedule) {
|
||||
SCH_JOB_DLOG("will wait for rsp now");
|
||||
tsem_wait(&pJob->rspSem);
|
||||
}
|
||||
|
||||
SCH_JOB_DLOG("job exec done");
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
@ -960,12 +976,12 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (taosGetSystemUUID(&schMgmt.sId, sizeof(schMgmt.sId))) {
|
||||
if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
|
||||
qError("generate schdulerId failed, errno:%d", errno);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
|
||||
}
|
||||
|
||||
qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.cfg.maxJobNum);
|
||||
qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue