diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index aa87dd155b..534068d987 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -59,6 +59,11 @@ typedef struct SQueryResult { char *msg; } SQueryResult; +typedef struct STaskInfo { + SQueryNodeAddr addr; + SSubQueryMsg *msg; +} STaskInfo; + int32_t schedulerInit(SSchedulerCfg *cfg); /** @@ -101,6 +106,11 @@ void scheduleFreeJob(void *pJob); void schedulerDestroy(void); +int32_t schedulerGenerateTaskList(SQueryDag* pDag, SArray **pTasks); + +void schedulerFreeTaskList(SArray *taskList); + + #ifdef __cplusplus } #endif diff --git a/include/util/taoserror.h b/include/util/taoserror.h index bec4c8f261..a855e6d881 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -356,7 +356,11 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist") #define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist") #define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled") - +#define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped") +#define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling") +#define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping") +#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation") +#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error") // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired") @@ -438,4 +442,3 @@ int32_t* taosGetErrno(); #endif #endif /*_TD_COMMON_TAOS_ERROR_H_*/ - \ No newline at end of file diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 62ea28b811..7646ca81fc 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1374,6 +1374,83 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, return TSDB_CODE_SUCCESS; } +int32_t schedulerGenerateTaskList(SQueryDag* pDag, SArray **pTasks) { + if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t levelNum = taosArrayGetSize(pDag->pSubplans); + if (1 != levelNum) { + qError("invalid level num: %d", levelNum); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SArray *plans = taosArrayGet(pDag->pSubplans, 0); + int32_t taskNum = taosArrayGetSize(plans); + if (taskNum <= 0) { + qError("invalid task num: %d", taskNum); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo)); + if (NULL == info) { + qError("taosArrayInit %d taskInfo failed", taskNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + STaskInfo tInfo = {0}; + char *msg = NULL; + int32_t msgLen = 0; + int32_t code = 0; + + for (int32_t i = 0; i < taskNum; ++i) { + SSubplan *plan = taosArrayGetP(plans, i); + + tInfo.addr = plan->execNode; + + code = qSubPlanToString(plan, &msg, &msgLen); + if (TSDB_CODE_SUCCESS != code || NULL == msg || msgLen <= 0) { + qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen); + SCH_ERR_JRET(code); + } + + int32_t msgSize = sizeof(SSubQueryMsg) + msgLen; + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SSubQueryMsg *pMsg = msg; + + pMsg->header.vgId = htonl(tInfo.addr.nodeId); + + pMsg->sId = htobe64(schMgmt.sId); + pMsg->queryId = htobe64(plan->id.queryId); + pMsg->taskId = htobe64(atomic_add_fetch_64(&schMgmt.taskId, 1)); + pMsg->contentLen = htonl(msgLen); + memcpy(pMsg->msg, msg, msgLen); + + tInfo.msg = pMsg; + + if (NULL == taosArrayPush(info, &tInfo)) { + qError("taosArrayPush failed, idx:%d", i); + free(msg); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + *pTasks = info; + info = NULL; + +_return: + + schedulerFreeTaskList(info); + + SCH_RET(code); +} + + int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { if (NULL == pJob || NULL == pData) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1521,6 +1598,20 @@ void scheduleFreeJob(void *job) { qDebug("QID:%"PRIx64" job freed", queryId); } + +void schedulerFreeTaskList(SArray *taskList) { + if (NULL == taskList) { + return; + } + + int32_t taskNum = taosArrayGetSize(taskList); + for (int32_t i = 0; i < taskNum; ++i) { + STaskInfo *info = taosArrayGet(taskList, i); + tfree(info->msg); + } + + taosArrayDestroy(taskList); +} void schedulerDestroy(void) { if (schMgmt.jobs) {