Merge branch 'main' into merge/mainto3.0

This commit is contained in:
Shengliang Guan 2024-10-31 17:38:54 +08:00
commit 33cc4db548
24 changed files with 3678 additions and 46 deletions

View File

@ -421,7 +421,7 @@ typedef enum ENodeType {
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, // INACTIVE
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
@ -435,7 +435,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, // INACTIVE
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,

View File

@ -26,6 +26,8 @@ extern "C" {
#define FUNC_AGGREGATE_UDF_ID 5001
#define FUNC_SCALAR_UDF_ID 5002
extern const int32_t funcMgtBuiltinsNum;
typedef enum EFunctionType {
// aggregate function
FUNCTION_TYPE_APERCENTILE = 1,

View File

@ -788,9 +788,9 @@ typedef struct SDataDeleterNode {
char tableFName[TSDB_TABLE_NAME_LEN];
char tsColName[TSDB_COL_NAME_LEN];
STimeWindow deleteTimeRange;
SNode* pAffectedRows;
SNode* pStartTs;
SNode* pEndTs;
SNode* pAffectedRows; // usless
SNode* pStartTs; // usless
SNode* pEndTs; // usless
} SDataDeleterNode;
typedef struct SSubplan {

View File

@ -364,7 +364,7 @@ void* getTaskPoolWorkerCb();
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || (_code) == TSDB_CODE_MND_INVALID_SCHEMA_VER)
((_code) == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || (_code) == TSDB_CODE_MND_INVALID_SCHEMA_VER || (_code) == TSDB_CODE_SCH_DATA_SRC_EP_MISS)
#define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))

View File

@ -76,8 +76,6 @@ int32_t schedulerExecJob(SSchedulerReq* pReq, int64_t* pJob);
int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq* pReq);
void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param);
int32_t schedulerGetTasksStatus(int64_t job, SArray* pSub);
void schedulerStopQueryHb(void* pTrans);
@ -100,6 +98,8 @@ void schedulerFreeJob(int64_t* job, int32_t errCode);
void schedulerDestroy(void);
int32_t schedulerValidatePlan(SQueryPlan* pPlan);
#ifdef __cplusplus
}
#endif

View File

@ -771,6 +771,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_SCH_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504)
#define TSDB_CODE_SCH_JOB_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x2505)
#define TSDB_CODE_SCH_JOB_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x2506)
#define TSDB_CODE_SCH_DATA_SRC_EP_MISS TAOS_DEF_ERROR_CODE(0, 0x2507)
//parser
#define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600)

View File

@ -188,6 +188,47 @@ typedef enum EOperatorType {
OP_TYPE_ASSIGN = 200
} EOperatorType;
static const EOperatorType OPERATOR_ARRAY[] = {
OP_TYPE_ADD,
OP_TYPE_SUB,
OP_TYPE_MULTI,
OP_TYPE_DIV,
OP_TYPE_REM,
OP_TYPE_MINUS,
OP_TYPE_BIT_AND,
OP_TYPE_BIT_OR,
OP_TYPE_GREATER_THAN,
OP_TYPE_GREATER_EQUAL,
OP_TYPE_LOWER_THAN,
OP_TYPE_LOWER_EQUAL,
OP_TYPE_EQUAL,
OP_TYPE_NOT_EQUAL,
OP_TYPE_IN,
OP_TYPE_NOT_IN,
OP_TYPE_LIKE,
OP_TYPE_NOT_LIKE,
OP_TYPE_MATCH,
OP_TYPE_NMATCH,
OP_TYPE_IS_NULL,
OP_TYPE_IS_NOT_NULL,
OP_TYPE_IS_TRUE,
OP_TYPE_IS_FALSE,
OP_TYPE_IS_UNKNOWN,
OP_TYPE_IS_NOT_TRUE,
OP_TYPE_IS_NOT_FALSE,
OP_TYPE_IS_NOT_UNKNOWN,
//OP_TYPE_COMPARE_MAX_VALUE,
OP_TYPE_JSON_GET_VALUE,
OP_TYPE_JSON_CONTAINS,
OP_TYPE_ASSIGN
};
#define OP_TYPE_CALC_MAX OP_TYPE_BIT_OR
typedef enum ELogicConditionType {

View File

@ -58,6 +58,8 @@ enum {
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA)
#define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_BATCH_META)
#define TSC_MAX_SUBPLAN_CAPACITY_NUM 1000
typedef struct SAppInstInfo SAppInstInfo;
typedef struct {

View File

@ -1250,6 +1250,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
int32_t code = 0;
int32_t subplanNum = 0;
if (pQuery->pRoot) {
pRequest->stmtType = pQuery->pRoot->type;
@ -1405,6 +1406,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
if (TSDB_CODE_SUCCESS == code) {
code = schedulerExecJob(&req, &pRequest->body.queryJob);
}
taosArrayDestroy(pNodeList);
} else {
qDestroyQueryPlan(pDag);

View File

@ -2346,8 +2346,13 @@ static void taosCheckAndSetDebugFlag(int32_t *pFlagPtr, char *name, int32_t flag
if (noNeedToSetVars != NULL && taosArraySearch(noNeedToSetVars, name, taosLogVarComp, TD_EQ) != NULL) {
return;
}
if (taosSetDebugFlag(pFlagPtr, name, flag) != 0) {
uError("failed to set flag %s to %d", name, flag);
int32_t code = 0;
if ((code = taosSetDebugFlag(pFlagPtr, name, flag)) != 0) {
if (code != TSDB_CODE_CFG_NOT_FOUND) {
uError("failed to set flag %s to %d, since:%s", name, flag, tstrerror(code));
} else {
uDebug("failed to set flag %s to %d, since:%s", name, flag, tstrerror(code));
}
}
return;
}

View File

@ -85,6 +85,8 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableList, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo);
int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode*pTagIndexCond, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);

View File

@ -32,3 +32,15 @@ TARGET_INCLUDE_DIRECTORIES(
PUBLIC "${TD_SOURCE_DIR}/include/common"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
ADD_EXECUTABLE(queryPlanTests queryPlanTests.cpp)
TARGET_LINK_LIBRARIES(
queryPlanTests
PRIVATE os util common executor gtest_main qcom function planner scalar nodes vnode
)
TARGET_INCLUDE_DIRECTORIES(
queryPlanTests
PUBLIC "${TD_SOURCE_DIR}/include/common"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)

File diff suppressed because it is too large Load Diff

View File

@ -1393,6 +1393,11 @@ static int32_t translateRepeat(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
}
static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (numOfParams <= 0) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
// The number of parameters has been limited by the syntax definition
SExprNode* pPara0 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
@ -1550,6 +1555,11 @@ static int32_t translateIn2GeomOutBool(SFunctionNode* pFunc, char* pErrBuf, int3
}
static int32_t translateSelectValue(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (numOfParams <= 0) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
return TSDB_CODE_SUCCESS;
}

View File

@ -703,6 +703,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
code = makeNode(type, sizeof(SGroupSortPhysiNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
code = makeNode(type, sizeof(SIntervalPhysiNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
code = makeNode(type, sizeof(SMergeIntervalPhysiNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
code = makeNode(type, sizeof(SMergeAlignedIntervalPhysiNode), &pNode); break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
@ -1606,10 +1608,14 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: {
SHashJoinPhysiNode* pPhyNode = (SHashJoinPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyNode(pPhyNode->pWindowOffset);
nodesDestroyNode(pPhyNode->pJLimit);
nodesDestroyList(pPhyNode->pOnLeft);
nodesDestroyList(pPhyNode->pOnRight);
nodesDestroyNode(pPhyNode->leftPrimExpr);
nodesDestroyNode(pPhyNode->rightPrimExpr);
nodesDestroyNode(pPhyNode->pLeftOnCond);
nodesDestroyNode(pPhyNode->pRightOnCond);
nodesDestroyNode(pPhyNode->pFullOnCond);
nodesDestroyList(pPhyNode->pTargets);
@ -1617,8 +1623,6 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pPhyNode->pColEqCond);
nodesDestroyNode(pPhyNode->pTagEqCond);
nodesDestroyNode(pPhyNode->pLeftOnCond);
nodesDestroyNode(pPhyNode->pRightOnCond);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
@ -1652,6 +1656,7 @@ void nodesDestroyNode(SNode* pNode) {
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
@ -2056,6 +2061,8 @@ void* nodesGetValueFromNode(SValueNode* pNode) {
int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value) {
switch (pNode->node.resType.type) {
case TSDB_DATA_TYPE_NULL:
break;
case TSDB_DATA_TYPE_BOOL:
pNode->datum.b = *(bool*)value;
*(bool*)&pNode->typeData = pNode->datum.b;
@ -2107,7 +2114,10 @@ int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_MEDIUMBLOB:
case TSDB_DATA_TYPE_GEOMETRY:
pNode->datum.p = (char*)value;
break;

View File

@ -24,6 +24,18 @@ extern "C" {
#include "tsimplehash.h"
#include "taoserror.h"
typedef struct SPhysiPlanContext {
SPlanContext* pPlanCxt;
int32_t errCode;
int16_t nextDataBlockId;
SArray* pLocationHelper;
SArray* pProjIdxLocHelper;
bool hasScan;
bool hasSysScan;
} SPhysiPlanContext;
#define planFatal(param, ...) qFatal("PLAN: " param, ##__VA_ARGS__)
#define planError(param, ...) qError("PLAN: " param, ##__VA_ARGS__)
#define planWarn(param, ...) qWarn("PLAN: " param, ##__VA_ARGS__)

View File

@ -30,15 +30,6 @@ typedef struct SSlotIndex {
SArray* pSlotIdsInfo; // duplicate name slot
} SSlotIndex;
typedef struct SPhysiPlanContext {
SPlanContext* pPlanCxt;
int32_t errCode;
int16_t nextDataBlockId;
SArray* pLocationHelper;
SArray* pProjIdxLocHelper;
bool hasScan;
bool hasSysScan;
} SPhysiPlanContext;
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char** ppKey, int32_t *pLen, uint16_t extraBufLen) {
int32_t code = 0;

View File

@ -62,6 +62,7 @@ typedef enum {
#define SCH_DEFAULT_MAX_RETRY_NUM 6
#define SCH_MIN_AYSNC_EXEC_NUM 3
#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3
#define SCH_DEFAULT_TASK_CAPACITY_NUM 1000
typedef struct SSchDebug {
bool lockEnable;
@ -318,6 +319,8 @@ typedef struct SSchTaskCtx {
extern SSchedulerMgmt schMgmt;
#define SCH_GET_TASK_CAPACITY(_n) ((_n) > SCH_DEFAULT_TASK_CAPACITY_NUM ? SCH_DEFAULT_TASK_CAPACITY_NUM : (_n))
#define SCH_TASK_TIMEOUT(_task) \
((taosGetTimestampUs() - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
@ -330,8 +333,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_BIND_TASK(task) \
(((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_DATA_BIND_PLAN(_plan) (((_plan)->subplanType == SUBPLAN_TYPE_SCAN) || ((_plan)->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_DATA_BIND_TASK(task) SCH_IS_DATA_BIND_PLAN((task)->plan)
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) \
@ -641,6 +644,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);

View File

@ -200,7 +200,12 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
if (NULL == child) {
SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(child)) {
SCH_JOB_ELOG("invalid subplan type for the %dth child, level:%d, subplanNodeType:%d", n, i, nodeType(child));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
@ -242,6 +247,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(parent)) {
SCH_JOB_ELOG("invalid subplan type for the %dth parent, level:%d, subplanNodeType:%d", n, i, nodeType(parent));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) {
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
@ -307,7 +317,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
pJob->dataSrcTasks = taosArrayInit(SCH_GET_TASK_CAPACITY(pDag->numOfSubplans), POINTER_BYTES);
if (NULL == pJob->dataSrcTasks) {
SCH_ERR_RET(terrno);
}
@ -319,12 +329,12 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
}
SHashObj *planToTask = taosHashInit(
pDag->numOfSubplans,
SCH_GET_TASK_CAPACITY(pDag->numOfSubplans),
taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
HASH_NO_LOCK);
if (NULL == planToTask) {
SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
SCH_ERR_RET(terrno);
SCH_ERR_JRET(terrno);
}
pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
@ -339,6 +349,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SSchLevel level = {0};
SNodeListNode *plans = NULL;
int32_t taskNum = 0;
int32_t totalTaskNum = 0;
SSchLevel *pLevel = NULL;
level.status = JOB_TASK_STATUS_INIT;
@ -352,7 +363,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
pLevel->level = i;
@ -363,12 +374,23 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
SCH_JOB_ELOG("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
if (taskNum <= 0) {
SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
totalTaskNum += taskNum;
if (totalTaskNum > pDag->numOfSubplans) {
SCH_JOB_ELOG("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
pLevel->taskNum = taskNum;
pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
@ -379,11 +401,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
for (int32_t n = 0; n < taskNum; ++n) {
SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
if (NULL == plan) {
SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d", n, taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_ERR_JRET(schValidateSubplan(pJob, plan, pLevel->level, n, taskNum));
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
SSchTask task = {0};
@ -397,14 +417,16 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
code = taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES);
if (0 != code) {
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d, error:%s", n, tstrerror(code));
SCH_ERR_JRET(code);
}
if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
code = taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d, error:%s", n, tstrerror(code));
SCH_ERR_JRET(code);
}
++pJob->taskNum;
@ -413,6 +435,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
}
if (totalTaskNum != pDag->numOfSubplans) {
SCH_JOB_ELOG("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
_return:
@ -781,9 +808,11 @@ void schFreeJobImpl(void *job) {
}
taosMemoryFree(pJob);
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
if (jobNum == 0) {
schCloseJobRef();
if (refId > 0) {
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
if (jobNum == 0) {
schCloseJobRef();
}
}
qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
@ -861,10 +890,10 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
}
}
pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
pJob->taskList = taosHashInit(SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans), taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
SCH_JOB_ELOG("taosHashInit %d taskList failed", SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans));
SCH_ERR_JRET(terrno);
}
@ -904,7 +933,7 @@ _return:
if (NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
} else if (pJob->refId < 0) {
} else if (pJob->refId <= 0) {
schFreeJobImpl(pJob);
} else {
code = taosRemoveRef(schMgmt.jobRef, pJob->refId);

View File

@ -831,7 +831,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_BIND_TASK(pTask)) {
SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_MND_INVALID_SCHEMA_VER);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));

View File

@ -360,3 +360,50 @@ void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask)
*pTask = *task;
}
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum) {
if (NULL == pSubplan) {
SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d, level: %d", idx, taskNum, level);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(pSubplan)) {
SCH_JOB_ELOG("invalid subplan type, level:%d, subplanNodeType:%d", level, nodeType(pSubplan));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (pSubplan->subplanType < SUBPLAN_TYPE_MERGE || pSubplan->subplanType > SUBPLAN_TYPE_COMPUTE) {
SCH_JOB_ELOG("invalid subplanType %d, level:%d, subplan idx:%d", pSubplan->subplanType, level, idx);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (pSubplan->level != level) {
SCH_JOB_ELOG("plan level %d mis-match with current level %d", pSubplan->level, level);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (SCH_IS_DATA_BIND_PLAN(pSubplan)) {
if (pSubplan->execNode.epSet.numOfEps <= 0) {
SCH_JOB_ELOG("no execNode specifed for data src plan %d, numOfEps:%d", pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_SCH_DATA_SRC_EP_MISS);
}
if (pSubplan->execNode.epSet.inUse >= pSubplan->execNode.epSet.numOfEps) {
SCH_JOB_ELOG("invalid epset inUse %d for data src plan %d, numOfEps:%d", pSubplan->execNode.epSet.inUse, pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
}
if (NULL == pSubplan->pNode && pSubplan->subplanType != SUBPLAN_TYPE_MODIFY) {
SCH_JOB_ELOG("empty plan root node, level:%d, subplan idx:%d, subplanType:%d", level, idx, pSubplan->subplanType);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (NULL == pSubplan->pDataSink) {
SCH_JOB_ELOG("empty plan dataSink, level:%d, subplan idx:%d", level, idx);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -224,3 +224,33 @@ void schedulerDestroy(void) {
qWorkerDestroy(&schMgmt.queryMgmt);
schMgmt.queryMgmt = NULL;
}
int32_t schedulerValidatePlan(SQueryPlan* pPlan) {
int32_t code = TSDB_CODE_SUCCESS;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " calloc %d failed", pPlan->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_RET(terrno);
}
pJob->taskList = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", 100);
SCH_ERR_JRET(terrno);
}
SCH_ERR_JRET(schValidateAndBuildJob(pPlan, pJob));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_JRET(qExecExplainBegin(pPlan, &pJob->explainCtx, 0));
}
_return:
schFreeJobImpl(pJob);
return code;
}

View File

@ -203,6 +203,10 @@ void schtBuildQueryDag(SQueryPlan *dag) {
return;
}
scanPlan->msgType = TDMT_SCH_QUERY;
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&scanPlan->pDataSink);
if (NULL == scanPlan->pDataSink) {
return;
}
mergePlan->id.queryId = qId;
mergePlan->id.groupId = schtMergeTemplateId;
@ -223,6 +227,10 @@ void schtBuildQueryDag(SQueryPlan *dag) {
return;
}
mergePlan->msgType = TDMT_SCH_QUERY;
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&mergePlan->pDataSink);
if (NULL == mergePlan->pDataSink) {
return;
}
merge->pNodeList = NULL;
code = nodesMakeList(&merge->pNodeList);
@ -235,6 +243,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
return;
}
(void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
(void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
@ -250,7 +259,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
int32_t scanPlanNum = 20;
dag->queryId = qId;
dag->numOfSubplans = 2;
dag->numOfSubplans = scanPlanNum + 1;
dag->pSubplans = NULL;
int32_t code = nodesMakeList(&dag->pSubplans);
if (NULL == dag->pSubplans) {
@ -289,6 +298,10 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
if (NULL == mergePlan->pChildren) {
return;
}
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&mergePlan->pDataSink);
if (NULL == mergePlan->pDataSink) {
return;
}
for (int32_t i = 0; i < scanPlanNum; ++i) {
SSubplan *scanPlan = NULL;
@ -322,6 +335,10 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
return;
}
scanPlan->msgType = TDMT_SCH_QUERY;
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&scanPlan->pDataSink);
if (NULL == scanPlan->pDataSink) {
return;
}
(void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
(void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);

View File

@ -618,6 +618,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal er
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_TIMEOUT_ERROR, "Task timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_JOB_IS_DROPPING, "Job is dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_JOB_NOT_EXISTS, "Job no longer exist")
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_DATA_SRC_EP_MISS, "No valid epSet for data source node")
// parser
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYNTAX_ERROR, "syntax error near")