feature/qnode

This commit is contained in:
dapan1121 2022-01-07 16:50:48 +08:00
parent d37352dd26
commit 3edb51d1bc
2 changed files with 126 additions and 147 deletions

View File

@ -84,7 +84,6 @@ typedef struct SSchJobAttr {
typedef struct SSchJob {
uint64_t queryId;
SSchJobAttr attr;
int32_t levelNum;
void *transport;
SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr

View File

@ -20,109 +20,106 @@
static SSchedulerMgmt schMgmt = {0};
int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
for (int32_t i = 0; i < job->levelNum; ++i) {
SSchLevel *level = taosArrayGet(job->levels, i);
static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t i = 0; i < pJob->levelNum; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
for (int32_t m = 0; m < level->taskNum; ++m) {
SSchTask *task = taosArrayGet(level->subTasks, m);
SSubplan *plan = task->plan;
int32_t childNum = plan->pChildren ? (int32_t)taosArrayGetSize(plan->pChildren) : 0;
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SSubplan *pPlan = pTask->plan;
int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0;
int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0;
if (childNum > 0) {
task->children = taosArrayInit(childNum, POINTER_BYTES);
if (NULL == task->children) {
qError("taosArrayInit %d failed", childNum);
pTask->children = taosArrayInit(childNum, POINTER_BYTES);
if (NULL == pTask->children) {
SCH_TASK_ELOG("taosArrayInit %d children failed", childNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
for (int32_t n = 0; n < childNum; ++n) {
SSubplan **child = taosArrayGet(plan->pChildren, n);
SSubplan **child = taosArrayGet(pPlan->pChildren, n);
SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) {
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (NULL == taosArrayPush(task->children, childTask)) {
qError("taosArrayPush failed");
if (NULL == taosArrayPush(pTask->children, childTask)) {
SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
if (parentNum > 0) {
task->parents = taosArrayInit(parentNum, POINTER_BYTES);
if (NULL == task->parents) {
qError("taosArrayInit %d failed", parentNum);
pTask->parents = taosArrayInit(parentNum, POINTER_BYTES);
if (NULL == pTask->parents) {
SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
for (int32_t n = 0; n < parentNum; ++n) {
SSubplan **parent = taosArrayGet(plan->pParents, n);
SSubplan **parent = taosArrayGet(pPlan->pParents, n);
SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) {
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (NULL == taosArrayPush(task->parents, parentTask)) {
qError("taosArrayPush failed");
if (NULL == taosArrayPush(pTask->parents, parentTask)) {
SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
}
SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum);
}
}
SSchLevel *level = taosArrayGet(job->levels, 0);
if (job->attr.queryJob && level->taskNum > 1) {
qError("invalid plan info, level 0, taskNum:%d", level->taskNum);
SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
if (pJob->attr.queryJob && pLevel->taskNum > 1) {
SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask *task = taosArrayGet(level->subTasks, 0);
if (task->parents && taosArrayGetSize(task->parents) > 0) {
qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
if (pTask->parents && taosArrayGetSize(pTask->parents) > 0) {
SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", (int32_t)taosArrayGetSize(pTask->parents));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
return TSDB_CODE_SUCCESS;
}
static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) {
SSchTask task = {0};
if (plan->type == QUERY_TYPE_MODIFY) {
pJob->attr.needFetch = false;
} else {
pJob->attr.queryJob = true;
}
static int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan;
pTask->level = pLevel;
pTask->status = JOB_TASK_STATUS_NOT_START;
pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task.plan = plan;
task.level = pLevel;
task.status = JOB_TASK_STATUS_NOT_START;
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
return task;
return TSDB_CODE_SUCCESS;
}
static void cleanupTask(SSchTask* pTask) {
static void schFreeTask(SSchTask* pTask) {
taosArrayDestroy(pTask->candidateAddrs);
}
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
static int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
int32_t code = 0;
job->queryId = dag->queryId;
pJob->queryId = pDag->queryId;
if (dag->numOfSubplans <= 0) {
SCH_JOB_ELOG("invalid subplan num:%d", dag->numOfSubplans);
if (pDag->numOfSubplans <= 0) {
SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
int32_t levelNum = (int32_t)taosArrayGetSize(pDag->pSubplans);
if (levelNum <= 0) {
SCH_JOB_ELOG("invalid level num:%d", levelNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -134,18 +131,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
if (NULL == job->levels) {
pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
if (NULL == pJob->levels) {
SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->attr.needFetch = true;
pJob->levelNum = levelNum;
pJob->levelIdx = levelNum - 1;
job->levelNum = levelNum;
job->levelIdx = levelNum - 1;
job->subPlans = dag->pSubplans;
pJob->subPlans = pDag->pSubplans;
SSchLevel level = {0};
SArray *plans = NULL;
@ -155,16 +150,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
level.status = JOB_TASK_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) {
if (NULL == taosArrayPush(job->levels, &level)) {
if (NULL == taosArrayPush(pJob->levels, &level)) {
SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLevel = taosArrayGet(job->levels, i);
pLevel = taosArrayGet(pJob->levels, i);
pLevel->level = i;
plans = taosArrayGetP(dag->pSubplans, i);
plans = taosArrayGetP(pDag->pSubplans, i);
if (NULL == plans) {
SCH_JOB_ELOG("empty level plan, level:%d", i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
@ -186,38 +181,34 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
for (int32_t n = 0; n < taskNum; ++n) {
SSubplan *plan = taosArrayGetP(plans, n);
if (plan->type == QUERY_TYPE_MODIFY) {
job->attr.needFetch = false;
} else {
job->attr.queryJob = true;
}
SSchTask task = initTask(job, plan, pLevel);
SCH_SET_JOB_TYPE(&pJob->attr, plan->type);
SSchTask task = {0};
SSchTask *pTask = &task;
schInitTask(pJob, &task, plan, pLevel);
void *p = taosArrayPush(pLevel->subTasks, &task);
if (NULL == p) {
qError("taosArrayPush failed");
SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
qError("taosHashPut failed");
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("task initialized, level:%d", pLevel->level);
}
SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
}
SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));
if (planToTask) {
taosHashCleanup(planToTask);
}
return TSDB_CODE_SUCCESS;
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
_return:
if (pLevel->subTasks) {
taosArrayDestroy(pLevel->subTasks);
}
if (planToTask) {
taosHashCleanup(planToTask);
@ -226,7 +217,7 @@ _return:
SCH_RET(code);
}
int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
static int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
if (task->candidateAddrs) {
return TSDB_CODE_SUCCESS;
}
@ -273,7 +264,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
return TSDB_CODE_SUCCESS;
}
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
static int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -284,7 +275,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
static int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId);
return TSDB_CODE_SUCCESS;
@ -300,7 +291,7 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
return TSDB_CODE_SUCCESS;
}
int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
static int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId);
}
@ -315,7 +306,7 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
return TSDB_CODE_SUCCESS;
}
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
static int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
@ -325,7 +316,7 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b
}
int32_t schFetchFromRemote(SSchJob *job) {
static int32_t schFetchFromRemote(SSchJob *job) {
int32_t code = 0;
if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) {
@ -344,11 +335,11 @@ _return:
}
int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
static int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
bool needFetch = job->userFetch;
if ((!job->attr.needFetch) && job->attr.syncSchedule) {
if ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule) {
tsem_post(&job->rspSem);
}
@ -359,27 +350,27 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *job) {
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
static int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) {
job->status = JOB_TASK_STATUS_FAILED;
job->errCode = errCode;
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) {
if (job->userFetch || ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule)) {
tsem_post(&job->rspSem);
}
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnDataFetched(SSchJob *job) {
static int32_t schProcessOnDataFetched(SSchJob *job) {
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
tsem_post(&job->rspSem);
}
int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
static int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
bool moved = false;
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
@ -451,7 +442,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
return TSDB_CODE_SUCCESS;
}
int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
static int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
bool needRetry = false;
bool moved = false;
int32_t taskDone = 0;
@ -488,7 +479,7 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
return TSDB_CODE_SUCCESS;
}
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
static int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0;
switch (msgType) {
@ -576,7 +567,7 @@ _return:
}
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
static int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
@ -602,32 +593,32 @@ _return:
SCH_RET(code);
}
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
static int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
}
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
static int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
switch (msgType) {
case TDMT_VND_CREATE_TABLE:
*fp = schHandleCreateTableCallback;
@ -656,7 +647,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
}
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
static int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
@ -695,7 +686,7 @@ _return:
SCH_RET(code);
}
void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
static void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
epSet->inUse = addr->inUse;
epSet->numOfEps = addr->numOfEps;
@ -706,7 +697,7 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
}
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
uint32_t msgSize = 0;
void *msg = NULL;
int32_t code = 0;
@ -819,7 +810,7 @@ _return:
}
int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
static int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SSubplan *plan = task->plan;
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
SCH_ERR_RET(schSetTaskCandidateAddrs(job, task));
@ -837,7 +828,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
return TSDB_CODE_SUCCESS;
}
int32_t schLaunchJob(SSchJob *job) {
static int32_t schLaunchJob(SSchJob *job) {
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
for (int32_t i = 0; i < level->taskNum; ++i) {
SSchTask *task = taosArrayGet(level->subTasks, i);
@ -849,7 +840,7 @@ int32_t schLaunchJob(SSchJob *job) {
return TSDB_CODE_SUCCESS;
}
void schDropJobAllTasks(SSchJob *job) {
static void schDropJobAllTasks(SSchJob *job) {
void *pIter = taosHashIterate(job->succTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
@ -877,82 +868,71 @@ void schDropJobAllTasks(SSchJob *job) {
}
}
uint64_t schGenSchId(void) {
uint64_t sId = 0;
// TODO
qDebug("Gen sId:0x%"PRIx64, sId);
return sId;
}
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) {
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
}
int32_t code = 0;
SSchJob *job = calloc(1, sizeof(SSchJob));
if (NULL == job) {
SSchJob *pJob = calloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%"PRIx64" calloc %d failed", sizeof(SSchJob));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->attr.syncSchedule = syncSchedule;
job->transport = transport;
job->nodeList = nodeList;
pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport;
pJob->nodeList = nodeList;
SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == job->execTasks) {
qError("taosHashInit %d failed", pDag->numOfSubplans);
pJob->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->execTasks) {
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == job->succTasks) {
qError("taosHashInit %d failed", pDag->numOfSubplans);
pJob->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->succTasks) {
SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == job->failTasks) {
qError("taosHashInit %d failed", pDag->numOfSubplans);
pJob->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->failTasks) {
SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tsem_init(&job->rspSem, 0, 0);
tsem_init(&pJob->rspSem, 0, 0);
code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES);
code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
qError("taosHashPut queryId:0x%"PRIx64" already exist", job->queryId);
SCH_JOB_ELOG("job already exist, type:%d", pJob->attr.queryJob);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} else {
qError("taosHashPut queryId:0x%"PRIx64" failed", job->queryId);
qError("taosHashPut queryId:0x%"PRIx64" failed", pJob->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
}
job->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(job));
pJob->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(pJob));
*(SSchJob **)pJob = job;
*(SSchJob **)job = pJob;
if (syncSchedule) {
tsem_wait(&job->rspSem);
tsem_wait(&pJob->rspSem);
}
return TSDB_CODE_SUCCESS;
_return:
*(SSchJob **)pJob = NULL;
scheduleFreeJob(job);
*(SSchJob **)job = NULL;
scheduleFreeJob(pJob);
SCH_RET(code);
}
@ -1022,7 +1002,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
SSchJob *job = pJob;
int32_t code = 0;
if (!job->attr.needFetch) {
if (!SCH_JOB_NEED_FETCH(&job->attr)) {
qError("no need to fetch data");
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
@ -1101,7 +1081,7 @@ void scheduleFreeJob(void *pJob) {
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
cleanupTask(pTask);
schFreeTask(pTask);
}
taosArrayDestroy(pLevel->subTasks);