feature/qnode

This commit is contained in:
dapan1121 2022-01-17 14:35:41 +08:00
parent cc3ead15fe
commit ded37ba19e
3 changed files with 106 additions and 2 deletions

View File

@ -59,6 +59,11 @@ typedef struct SQueryResult {
char *msg;
} SQueryResult;
typedef struct STaskInfo {
SQueryNodeAddr addr;
SSubQueryMsg *msg;
} STaskInfo;
int32_t schedulerInit(SSchedulerCfg *cfg);
/**
@ -101,6 +106,11 @@ void scheduleFreeJob(void *pJob);
void schedulerDestroy(void);
int32_t schedulerGenerateTaskList(SQueryDag* pDag, SArray **pTasks);
void schedulerFreeTaskList(SArray *taskList);
#ifdef __cplusplus
}
#endif

View File

@ -356,7 +356,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist")
#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist")
#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled")
#define TSDB_CODE_QRY_TASK_DROPPED TAOS_DEF_ERROR_CODE(0, 0x0715) //"Task dropped")
#define TSDB_CODE_QRY_TASK_CANCELLING TAOS_DEF_ERROR_CODE(0, 0x0716) //"Task cancelling")
#define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping")
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")
@ -438,4 +442,3 @@ int32_t* taosGetErrno();
#endif
#endif /*_TD_COMMON_TAOS_ERROR_H_*/

View File

@ -1374,6 +1374,83 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
return TSDB_CODE_SUCCESS;
}
int32_t schedulerGenerateTaskList(SQueryDag* pDag, SArray **pTasks) {
if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t levelNum = taosArrayGetSize(pDag->pSubplans);
if (1 != levelNum) {
qError("invalid level num: %d", levelNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SArray *plans = taosArrayGet(pDag->pSubplans, 0);
int32_t taskNum = taosArrayGetSize(plans);
if (taskNum <= 0) {
qError("invalid task num: %d", taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SArray *info = taosArrayInit(taskNum, sizeof(STaskInfo));
if (NULL == info) {
qError("taosArrayInit %d taskInfo failed", taskNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskInfo tInfo = {0};
char *msg = NULL;
int32_t msgLen = 0;
int32_t code = 0;
for (int32_t i = 0; i < taskNum; ++i) {
SSubplan *plan = taosArrayGetP(plans, i);
tInfo.addr = plan->execNode;
code = qSubPlanToString(plan, &msg, &msgLen);
if (TSDB_CODE_SUCCESS != code || NULL == msg || msgLen <= 0) {
qError("subplanToString error, code:%x, msg:%p, len:%d", code, msg, msgLen);
SCH_ERR_JRET(code);
}
int32_t msgSize = sizeof(SSubQueryMsg) + msgLen;
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SSubQueryMsg *pMsg = msg;
pMsg->header.vgId = htonl(tInfo.addr.nodeId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(plan->id.queryId);
pMsg->taskId = htobe64(atomic_add_fetch_64(&schMgmt.taskId, 1));
pMsg->contentLen = htonl(msgLen);
memcpy(pMsg->msg, msg, msgLen);
tInfo.msg = pMsg;
if (NULL == taosArrayPush(info, &tInfo)) {
qError("taosArrayPush failed, idx:%d", i);
free(msg);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
*pTasks = info;
info = NULL;
_return:
schedulerFreeTaskList(info);
SCH_RET(code);
}
int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
if (NULL == pJob || NULL == pData) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -1521,6 +1598,20 @@ void scheduleFreeJob(void *job) {
qDebug("QID:%"PRIx64" job freed", queryId);
}
void schedulerFreeTaskList(SArray *taskList) {
if (NULL == taskList) {
return;
}
int32_t taskNum = taosArrayGetSize(taskList);
for (int32_t i = 0; i < taskNum; ++i) {
STaskInfo *info = taosArrayGet(taskList, i);
tfree(info->msg);
}
taosArrayDestroy(taskList);
}
void schedulerDestroy(void) {
if (schMgmt.jobs) {