feature/qnode

This commit is contained in:
dapan1121 2022-01-05 10:11:26 +08:00
parent eb9a70975a
commit feee87ddfa
2 changed files with 25 additions and 11 deletions

View File

@ -114,8 +114,9 @@ typedef struct SSchJob {
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_JOB_ELOG(param, ...) qError("QID:% "PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)

View File

@ -270,6 +270,8 @@ int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("push to %s list", "execTasks");
return TSDB_CODE_SUCCESS;
}
@ -284,6 +286,8 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("push to %s list", "succTasks");
*moved = true;
return TSDB_CODE_SUCCESS;
@ -299,6 +303,8 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_TASK_DLOG("push to %s list", "failTasks");
*moved = true;
return TSDB_CODE_SUCCESS;
@ -372,7 +378,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
if (!moved) {
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
SCH_TASK_ELOG("task may already moved, status:%d", task->status);
return TSDB_CODE_SUCCESS;
}
@ -447,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
if (!needRetry) {
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
SCH_TASK_ELOG("task failed[%x], no more retry", errCode);
SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
if (!moved) {
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
SCH_TASK_ELOG("task may already moved, status:%d", task->status);
}
if (SCH_TASK_NEED_WAIT_ALL(task)) {
@ -492,6 +498,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms
goto _task_error;
}
}
break;
}
case TDMT_VND_SUBMIT_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) {
@ -567,19 +574,25 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
int32_t code = 0;
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) {
SSchJob **pjob = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == pjob || NULL == (*pjob)) {
qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
SSchJob *job = *pjob;
SSchTask **ptask = taosHashGet(job->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == ptask || NULL == (*ptask)) {
qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask *task = *ptask;
SCH_TASK_DLOG("Got msg:%d, rspCode:%d", msgType, rspCode);
schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode);
schProcessRspMsg(job, task, msgType, pMsg->pData, pMsg->len, rspCode);
_return:
tfree(param);
@ -809,7 +822,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(schSetTaskCondidateAddrs(job, task));
if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) {
SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId);
SCH_TASK_ELOG("no valid condidate node for task:%"PRIx64, task->taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}