no refact

This commit is contained in:
Hongze Cheng 2022-01-24 02:31:19 +00:00
parent 6e358fd42f
commit af406ab31a
2 changed files with 298 additions and 281 deletions

View File

@ -47,7 +47,7 @@ option(
option(
BUILD_WITH_UV
"If build with libuv"
ON
OFF
)
option(

View File

@ -13,18 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "query.h"
#include "schedulerInt.h"
#include "tmsg.h"
#include "query.h"
#include "catalog.h"
static SSchedulerMgmt schMgmt = {0};
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
uint64_t schGenTaskId(void) {
return atomic_add_fetch_64(&schMgmt.taskId, 1);
}
uint64_t schGenUUID(void) {
static uint64_t hashId = 0;
static int32_t requestSerialId = 0;
static int32_t requestSerialId = 0;
if (hashId == 0) {
char uid[64];
@ -36,17 +38,18 @@ uint64_t schGenUUID(void) {
}
}
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&requestSerialId, 1);
uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id;
}
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan;
pTask->level = pLevel;
int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) {
pTask->plan = pPlan;
pTask->level = pLevel;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
pTask->taskId = schGenTaskId();
pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
@ -58,7 +61,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
return TSDB_CODE_SUCCESS;
}
void schFreeTask(SSchTask *pTask) {
void schFreeTask(SSchTask* pTask) {
if (pTask->candidateAddrs) {
taosArrayDestroy(pTask->candidateAddrs);
}
@ -78,6 +81,7 @@ void schFreeTask(SSchTask *pTask) {
}
}
int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType);
@ -93,10 +97,8 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING &&
SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask),
msgType);
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
@ -110,6 +112,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
return TSDB_CODE_SUCCESS;
}
int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
int32_t code = 0;
@ -136,16 +139,19 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break;
case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED &&
newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED &&
newStatus != JOB_TASK_STATUS_DROPPING) {
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_CANCELLING
&& newStatus != JOB_TASK_STATUS_CANCELLED
&& newStatus != JOB_TASK_STATUS_DROPPING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED &&
newStatus != JOB_TASK_STATUS_DROPPING) {
if (newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_SUCCEED
&& newStatus != JOB_TASK_STATUS_DROPPING) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
@ -185,6 +191,7 @@ _return:
SCH_ERR_RET(code);
}
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t i = 0; i < pJob->levelNum; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
@ -192,8 +199,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
SSubplan *pPlan = pTask->plan;
int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0;
int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0;
int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0;
int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0;
if (childNum > 0) {
if (pJob->levelIdx == pLevel->level) {
@ -267,6 +274,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schRecordTaskSucceedNode(SSchTask *pTask) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
@ -277,6 +285,7 @@ int32_t schRecordTaskSucceedNode(SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr) {
if (NULL == taosArrayPush(pTask->execAddrs, addr)) {
SCH_TASK_ELOG("taosArrayPush addr to execAddr list failed, errno:%d", errno);
@ -286,6 +295,7 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad
return TSDB_CODE_SUCCESS;
}
int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
int32_t code = 0;
pJob->queryId = pDag->queryId;
@ -301,10 +311,7 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SHashObj *planToTask = taosHashInit(
SCHEDULE_DEFAULT_TASK_NUMBER,
taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
HASH_NO_LOCK);
SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == planToTask) {
SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -321,9 +328,9 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) {
pJob->subPlans = pDag->pSubplans;
SSchLevel level = {0};
SArray * plans = NULL;
int32_t taskNum = 0;
SSchLevel level = {0};
SArray *plans = NULL;
int32_t taskNum = 0;
SSchLevel *pLevel = NULL;
level.status = JOB_TASK_STATUS_NOT_START;
@ -440,14 +447,14 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
/*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
/*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
++epSet->numOfEps;
}
*/
++epSet->numOfEps;
}
*/
return TSDB_CODE_SUCCESS;
}
@ -521,6 +528,7 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
return TSDB_CODE_SUCCESS;
}
int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) {
SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask));
@ -546,6 +554,7 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
return TSDB_CODE_SUCCESS;
}
int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) {
// TODO set retry or not based on task type/errCode/retry times/job status/available eps...
// TODO if needRetry, set task retry info
@ -574,6 +583,8 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod
assert(0);
}
// Note: no more error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode));
@ -584,6 +595,7 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) {
SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode));
}
// Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) {
int32_t code = 0;
@ -614,6 +626,7 @@ _return:
return code;
}
// Note: no more error processing, handled in function internal
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
int32_t code = 0;
@ -644,8 +657,8 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
// Note: no more error processing, handled in function internal
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
bool needRetry = false;
bool moved = false;
bool needRetry = false;
bool moved = false;
int32_t taskDone = 0;
int32_t code = 0;
@ -692,10 +705,11 @@ _return:
SCH_ERR_RET(errCode);
}
// Note: no more error processing, handled in function internal
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
bool moved = false;
int32_t code = 0;
bool moved = false;
int32_t code = 0;
SSchTask *pErrTask = pTask;
SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask));
@ -720,7 +734,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
SCH_UNLOCK(SCH_WRITE, &pTask->level->lock);
if (taskDone < pTask->level->taskNum) {
SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum);
return TSDB_CODE_SUCCESS;
} else if (taskDone > pTask->level->taskNum) {
@ -748,14 +762,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
/*
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
/*
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
++job->dataSrcEps.numOfEps;
}
*/
++job->dataSrcEps.numOfEps;
}
*/
for (int32_t i = 0; i < parentNum; ++i) {
SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i);
@ -782,24 +796,23 @@ _return:
SCH_ERR_RET(code);
}
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
int32_t rspCode) {
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
int32_t code = 0;
SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType));
switch (msgType) {
case TDMT_VND_CREATE_TABLE_RSP: {
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
case TDMT_VND_SUBMIT_RSP: {
#if 0 // TODO OPEN THIS
#if 0 //TODO OPEN THIS
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
@ -807,69 +820,71 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
}
pJob->resNumOfRows += rsp->affectedRows;
#else
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
#else
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
if (rsp) {
pJob->resNumOfRows += rsp->affectedRows;
}
#endif
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
if (rsp) {
pJob->resNumOfRows += rsp->affectedRows;
}
#endif
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
break;
}
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY));
break;
}
case TDMT_VND_RES_READY_RSP: {
SResReadyRsp *rsp = (SResReadyRsp *)msg;
SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code));
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
break;
}
case TDMT_VND_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
}
if (pJob->res) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res);
tfree(rsp);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
if (pJob->res) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res);
tfree(rsp);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
atomic_store_ptr(&pJob->res, rsp);
atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows);
if (rsp->completed) {
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
}
SCH_ERR_JRET(schProcessOnDataFetched(pJob));
break;
}
SCH_ERR_JRET(schProcessOnDataFetched(pJob));
break;
}
case TDMT_VND_DROP_TASK: {
// SHOULD NEVER REACH HERE
assert(0);
break;
}
// SHOULD NEVER REACH HERE
assert(0);
break;
}
default:
SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask));
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
@ -884,15 +899,16 @@ _return:
SCH_RET(code);
}
int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
SSchJob * pJob = NULL;
SSchTask * pTask = NULL;
SSchJob *pJob = NULL;
SSchTask *pTask = NULL;
SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId));
if (NULL == job || NULL == (*job)) {
qError("QID:%" PRIx64 " taosHashGet queryId not exist, may be dropped", pParam->queryId);
qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
}
@ -902,13 +918,13 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
int32_t s = taosHashGetSize(pJob->execTasks);
if (s <= 0) {
qError("QID:%" PRIx64 ",TID:%" PRId64 " no task in execTask list", pParam->queryId, pParam->taskId);
qError("QID:%"PRIx64",TID:%"PRId64" no task in execTask list", pParam->queryId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId));
if (NULL == task || NULL == (*task)) {
qError("QID:%" PRIx64 ",TID:%" PRId64 " taosHashGet taskId not exist", pParam->queryId, pParam->taskId);
qError("QID:%"PRIx64",TID:%"PRId64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
@ -927,29 +943,29 @@ _return:
SCH_RET(code);
}
int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
}
int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
}
int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
}
int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
}
int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
}
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
qDebug("QID:%"PRIx64",TID:%"PRIx64" drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
}
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
@ -980,18 +996,18 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
return TSDB_CODE_SUCCESS;
}
int32_t schAsyncSendMsg(void *transport, SEpSet *epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg,
uint32_t msgSize) {
int32_t code = 0;
SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) {
int32_t code = 0;
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (NULL == pMsgSendInfo) {
qError("QID:%" PRIx64 ",TID:%" PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam));
if (NULL == param) {
qError("QID:%" PRIx64 ",TID:%" PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam));
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@ -1007,11 +1023,11 @@ int32_t schAsyncSendMsg(void *transport, SEpSet *epSet, uint64_t qId, uint64_t t
pMsgSendInfo->msgType = msgType;
pMsgSendInfo->fp = fp;
int64_t transporterId = 0;
int64_t transporterId = 0;
code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo);
if (code) {
qError("QID:%" PRIx64 ",TID:%" PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code);
qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code);
SCH_ERR_JRET(code);
}
@ -1037,10 +1053,10 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
uint32_t msgSize = 0;
void * msg = NULL;
int32_t code = 0;
bool isCandidateAddr = false;
SEpSet epSet;
void *msg = NULL;
int32_t code = 0;
bool isCandidateAddr = false;
SEpSet epSet;
if (NULL == addr) {
addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx));
@ -1119,7 +1135,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->taskId = htobe64(pTask->taskId);
break;
}
case TDMT_VND_DROP_TASK: {
case TDMT_VND_DROP_TASK:{
msgSize = sizeof(STaskDropReq);
msg = calloc(1, msgSize);
if (NULL == msg) {
@ -1166,13 +1182,14 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
*pStatus = status;
}
return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED ||
status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING);
return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED
|| status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING);
}
// Note: no more error processing, handled in function internal
int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
int8_t status = 0;
int8_t status = 0;
int32_t code = 0;
if (schJobNeedToStop(pJob, &status)) {
@ -1271,15 +1288,15 @@ void schDropJobAllTasks(SSchJob *pJob) {
schDropTaskInHashList(pJob, pJob->failTasks);
}
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag *pDag, struct SSchJob **job, bool syncSchedule) {
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) {
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
qInfo("QID:%" PRIx64 " input nodeList is empty", pDag->queryId);
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
}
int32_t code = 0;
int32_t code = 0;
SSchJob *pJob = calloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
qError("QID:%"PRIx64" calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@ -1289,22 +1306,19 @@ int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag *pDag, struc
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
pJob->execTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
pJob->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->execTasks) {
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->succTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
pJob->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->succTasks) {
SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->failTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
pJob->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->failTasks) {
SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -1348,11 +1362,13 @@ _return:
}
int32_t schCancelJob(SSchJob *pJob) {
// TODO
//TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
// TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobs) {
qError("scheduler already initialized");
@ -1369,8 +1385,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
schMgmt.jobs =
taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.jobs) {
qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
@ -1381,12 +1396,12 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
}
qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);
qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);
return TSDB_CODE_SUCCESS;
}
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag *pDag, struct SSchJob **pJob, SQueryResult *pRes) {
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
@ -1403,7 +1418,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag *pDag, stru
return TSDB_CODE_SUCCESS;
}
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag *pDag, struct SSchJob **pJob) {
int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
@ -1412,7 +1427,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag *pDag,
return TSDB_CODE_SUCCESS;
}
int32_t schedulerConvertDagToTaskList(SQueryDag *pDag, SArray **pTasks) {
int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
@ -1437,9 +1452,9 @@ int32_t schedulerConvertDagToTaskList(SQueryDag *pDag, SArray **pTasks) {
}
STaskInfo tInfo = {0};
char * msg = NULL;
int32_t msgLen = 0;
int32_t code = 0;
char *msg = NULL;
int32_t msgLen = 0;
int32_t code = 0;
for (int32_t i = 0; i < taskNum; ++i) {
SSubplan *plan = taosArrayGetP(plans, i);
@ -1502,7 +1517,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t msgSize = src->msg->contentLen + sizeof(*src->msg);
int32_t msgSize = src->msg->contentLen + sizeof(*src->msg);
STaskInfo info = {0};
info.addr = src->addr;
@ -1535,7 +1550,8 @@ _return:
SCH_RET(code);
}
int32_t scheduleFetchRows(SSchJob *pJob, void **pData) {
int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
if (NULL == pJob || NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
@ -1629,7 +1645,7 @@ void scheduleFreeJob(void *job) {
SSchJob *pJob = job;
uint64_t queryId = pJob->queryId;
bool setJobFree = false;
bool setJobFree = false;
if (SCH_GET_JOB_STATUS(pJob) > 0) {
if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) {
@ -1664,15 +1680,15 @@ void scheduleFreeJob(void *job) {
schDropJobAllTasks(pJob);
}
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for (int32_t i = 0; i < numOfLevels; ++i) {
for(int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for (int32_t j = 0; j < numOfTasks; ++j) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
for(int32_t j = 0; j < numOfTasks; ++j) {
SSchTask* pTask = taosArrayGet(pLevel->subTasks, j);
schFreeTask(pTask);
}
@ -1690,7 +1706,7 @@ void scheduleFreeJob(void *job) {
tfree(pJob);
qDebug("QID:%" PRIx64 " job freed", queryId);
qDebug("QID:%"PRIx64" job freed", queryId);
}
void schedulerFreeTaskList(SArray *taskList) {
@ -1709,7 +1725,8 @@ void schedulerFreeTaskList(SArray *taskList) {
void schedulerDestroy(void) {
if (schMgmt.jobs) {
taosHashCleanup(schMgmt.jobs); // TODO
taosHashCleanup(schMgmt.jobs); //TODO
schMgmt.jobs = NULL;
}
}