feature/scheduler
This commit is contained in:
parent
2126e4c2a6
commit
c80da5f718
|
@ -166,7 +166,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
|
int32_t qwAddSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType) {
|
||||||
SQWSchStatus newSch = {0};
|
SQWSchStatus newSch = {0};
|
||||||
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
if (NULL == newSch.tasksHash) {
|
if (NULL == newSch.tasksHash) {
|
||||||
|
@ -200,7 +200,7 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, int32_t rwType,
|
||||||
QW_UNLOCK(rwType, &mgmt->schLock);
|
QW_UNLOCK(rwType, &mgmt->schLock);
|
||||||
|
|
||||||
if (QW_NOT_EXIST_ADD == nOpt) {
|
if (QW_NOT_EXIST_ADD == nOpt) {
|
||||||
QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType, sch));
|
QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
|
||||||
|
|
||||||
nOpt = QW_NOT_EXIST_RET_ERR;
|
nOpt = QW_NOT_EXIST_RET_ERR;
|
||||||
|
|
||||||
|
|
|
@ -125,7 +125,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
|
||||||
case TDMT_SCH_LINK_BROKEN:
|
case TDMT_SCH_LINK_BROKEN:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp
|
case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp
|
||||||
if (lastMsgType != reqMsgType) {
|
if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
|
||||||
SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType));
|
SCH_TASK_DLOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1776,6 +1776,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
pMsg->sId = htobe64(schMgmt.sId);
|
pMsg->sId = htobe64(schMgmt.sId);
|
||||||
pMsg->queryId = htobe64(pJob->queryId);
|
pMsg->queryId = htobe64(pJob->queryId);
|
||||||
pMsg->taskId = htobe64(pTask->taskId);
|
pMsg->taskId = htobe64(pTask->taskId);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_VND_DROP_TASK: {
|
case TDMT_VND_DROP_TASK: {
|
||||||
|
|
Loading…
Reference in New Issue