Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837
This commit is contained in:
commit
0e0a7ae2e9
|
@ -28,7 +28,7 @@ jobs:
|
|||
run: |
|
||||
mkdir debug
|
||||
cd debug
|
||||
cmake .. -DBUILD_HTTP=false -DBUILD_JDBC=false -DBUILD_TOOLS=false -DBUILD_TEST=off -DBUILD_KEEPER=true
|
||||
cmake .. -DBUILD_HTTP=false -DBUILD_JDBC=false -DBUILD_TOOLS=false -DBUILD_TEST=off -DBUILD_KEEPER=true -DBUILD_DEPENDENCY_TESTS=false
|
||||
make -j 4
|
||||
sudo make install
|
||||
which taosd
|
||||
|
|
|
@ -421,7 +421,7 @@ typedef enum ENodeType {
|
|||
// physical plan node
|
||||
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
|
||||
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, // INACTIVE
|
||||
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
|
||||
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
|
||||
|
@ -435,7 +435,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_SORT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, // INACTIVE
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
|
||||
|
|
|
@ -26,6 +26,8 @@ extern "C" {
|
|||
#define FUNC_AGGREGATE_UDF_ID 5001
|
||||
#define FUNC_SCALAR_UDF_ID 5002
|
||||
|
||||
extern const int32_t funcMgtBuiltinsNum;
|
||||
|
||||
typedef enum EFunctionType {
|
||||
// aggregate function
|
||||
FUNCTION_TYPE_APERCENTILE = 1,
|
||||
|
|
|
@ -804,9 +804,9 @@ typedef struct SDataDeleterNode {
|
|||
char tableFName[TSDB_TABLE_NAME_LEN];
|
||||
char tsColName[TSDB_COL_NAME_LEN];
|
||||
STimeWindow deleteTimeRange;
|
||||
SNode* pAffectedRows;
|
||||
SNode* pStartTs;
|
||||
SNode* pEndTs;
|
||||
SNode* pAffectedRows; // usless
|
||||
SNode* pStartTs; // usless
|
||||
SNode* pEndTs; // usless
|
||||
} SDataDeleterNode;
|
||||
|
||||
typedef struct SSubplan {
|
||||
|
|
|
@ -364,7 +364,7 @@ void* getTaskPoolWorkerCb();
|
|||
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
|
||||
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
|
||||
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) \
|
||||
((_code) == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || (_code) == TSDB_CODE_MND_INVALID_SCHEMA_VER)
|
||||
((_code) == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || (_code) == TSDB_CODE_MND_INVALID_SCHEMA_VER || (_code) == TSDB_CODE_SCH_DATA_SRC_EP_MISS)
|
||||
#define NEED_CLIENT_HANDLE_ERROR(_code) \
|
||||
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
|
||||
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
|
||||
|
|
|
@ -76,8 +76,6 @@ int32_t schedulerExecJob(SSchedulerReq* pReq, int64_t* pJob);
|
|||
|
||||
int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq* pReq);
|
||||
|
||||
void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param);
|
||||
|
||||
int32_t schedulerGetTasksStatus(int64_t job, SArray* pSub);
|
||||
|
||||
void schedulerStopQueryHb(void* pTrans);
|
||||
|
@ -100,6 +98,8 @@ void schedulerFreeJob(int64_t* job, int32_t errCode);
|
|||
|
||||
void schedulerDestroy(void);
|
||||
|
||||
int32_t schedulerValidatePlan(SQueryPlan* pPlan);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -771,6 +771,7 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_SCH_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x2504)
|
||||
#define TSDB_CODE_SCH_JOB_IS_DROPPING TAOS_DEF_ERROR_CODE(0, 0x2505)
|
||||
#define TSDB_CODE_SCH_JOB_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x2506)
|
||||
#define TSDB_CODE_SCH_DATA_SRC_EP_MISS TAOS_DEF_ERROR_CODE(0, 0x2507)
|
||||
|
||||
//parser
|
||||
#define TSDB_CODE_PAR_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x2600)
|
||||
|
|
|
@ -188,6 +188,47 @@ typedef enum EOperatorType {
|
|||
OP_TYPE_ASSIGN = 200
|
||||
} EOperatorType;
|
||||
|
||||
static const EOperatorType OPERATOR_ARRAY[] = {
|
||||
OP_TYPE_ADD,
|
||||
OP_TYPE_SUB,
|
||||
OP_TYPE_MULTI,
|
||||
OP_TYPE_DIV,
|
||||
OP_TYPE_REM,
|
||||
|
||||
OP_TYPE_MINUS,
|
||||
|
||||
OP_TYPE_BIT_AND,
|
||||
OP_TYPE_BIT_OR,
|
||||
|
||||
OP_TYPE_GREATER_THAN,
|
||||
OP_TYPE_GREATER_EQUAL,
|
||||
OP_TYPE_LOWER_THAN,
|
||||
OP_TYPE_LOWER_EQUAL,
|
||||
OP_TYPE_EQUAL,
|
||||
OP_TYPE_NOT_EQUAL,
|
||||
OP_TYPE_IN,
|
||||
OP_TYPE_NOT_IN,
|
||||
OP_TYPE_LIKE,
|
||||
OP_TYPE_NOT_LIKE,
|
||||
OP_TYPE_MATCH,
|
||||
OP_TYPE_NMATCH,
|
||||
|
||||
OP_TYPE_IS_NULL,
|
||||
OP_TYPE_IS_NOT_NULL,
|
||||
OP_TYPE_IS_TRUE,
|
||||
OP_TYPE_IS_FALSE,
|
||||
OP_TYPE_IS_UNKNOWN,
|
||||
OP_TYPE_IS_NOT_TRUE,
|
||||
OP_TYPE_IS_NOT_FALSE,
|
||||
OP_TYPE_IS_NOT_UNKNOWN,
|
||||
//OP_TYPE_COMPARE_MAX_VALUE,
|
||||
|
||||
OP_TYPE_JSON_GET_VALUE,
|
||||
OP_TYPE_JSON_CONTAINS,
|
||||
|
||||
OP_TYPE_ASSIGN
|
||||
};
|
||||
|
||||
#define OP_TYPE_CALC_MAX OP_TYPE_BIT_OR
|
||||
|
||||
typedef enum ELogicConditionType {
|
||||
|
|
|
@ -58,6 +58,8 @@ enum {
|
|||
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA)
|
||||
#define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_BATCH_META)
|
||||
|
||||
#define TSC_MAX_SUBPLAN_CAPACITY_NUM 1000
|
||||
|
||||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1250,6 +1250,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
|||
|
||||
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
|
||||
int32_t code = 0;
|
||||
int32_t subplanNum = 0;
|
||||
|
||||
if (pQuery->pRoot) {
|
||||
pRequest->stmtType = pQuery->pRoot->type;
|
||||
|
@ -1405,6 +1406,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pNodeList);
|
||||
} else {
|
||||
qDestroyQueryPlan(pDag);
|
||||
|
|
|
@ -2346,8 +2346,13 @@ static void taosCheckAndSetDebugFlag(int32_t *pFlagPtr, char *name, int32_t flag
|
|||
if (noNeedToSetVars != NULL && taosArraySearch(noNeedToSetVars, name, taosLogVarComp, TD_EQ) != NULL) {
|
||||
return;
|
||||
}
|
||||
if (taosSetDebugFlag(pFlagPtr, name, flag) != 0) {
|
||||
uError("failed to set flag %s to %d", name, flag);
|
||||
int32_t code = 0;
|
||||
if ((code = taosSetDebugFlag(pFlagPtr, name, flag)) != 0) {
|
||||
if (code != TSDB_CODE_CFG_NOT_FOUND) {
|
||||
uError("failed to set flag %s to %d, since:%s", name, flag, tstrerror(code));
|
||||
} else {
|
||||
uDebug("failed to set flag %s to %d, since:%s", name, flag, tstrerror(code));
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -85,6 +85,8 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
|
|||
|
||||
int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableList, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||
|
||||
int32_t createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo);
|
||||
|
||||
int32_t createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||
|
||||
int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode*pTagIndexCond, SExecTaskInfo* pTaskInfo, SOperatorInfo** pInfo);
|
||||
|
|
|
@ -32,3 +32,15 @@ TARGET_INCLUDE_DIRECTORIES(
|
|||
PUBLIC "${TD_SOURCE_DIR}/include/common"
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
ADD_EXECUTABLE(queryPlanTests queryPlanTests.cpp)
|
||||
TARGET_LINK_LIBRARIES(
|
||||
queryPlanTests
|
||||
PRIVATE os util common executor gtest_main qcom function planner scalar nodes vnode
|
||||
)
|
||||
|
||||
TARGET_INCLUDE_DIRECTORIES(
|
||||
queryPlanTests
|
||||
PUBLIC "${TD_SOURCE_DIR}/include/common"
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1393,6 +1393,11 @@ static int32_t translateRepeat(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
|||
}
|
||||
|
||||
static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (numOfParams <= 0) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
// The number of parameters has been limited by the syntax definition
|
||||
|
||||
SExprNode* pPara0 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||
|
@ -1550,6 +1555,11 @@ static int32_t translateIn2GeomOutBool(SFunctionNode* pFunc, char* pErrBuf, int3
|
|||
}
|
||||
|
||||
static int32_t translateSelectValue(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (numOfParams <= 0) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
pFunc->node.resType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -703,6 +703,8 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
|
|||
code = makeNode(type, sizeof(SGroupSortPhysiNode), &pNode); break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
|
||||
code = makeNode(type, sizeof(SIntervalPhysiNode), &pNode); break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
|
||||
code = makeNode(type, sizeof(SMergeIntervalPhysiNode), &pNode); break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
|
||||
code = makeNode(type, sizeof(SMergeAlignedIntervalPhysiNode), &pNode); break;
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
|
@ -1608,10 +1610,14 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: {
|
||||
SHashJoinPhysiNode* pPhyNode = (SHashJoinPhysiNode*)pNode;
|
||||
destroyPhysiNode((SPhysiNode*)pPhyNode);
|
||||
nodesDestroyNode(pPhyNode->pWindowOffset);
|
||||
nodesDestroyNode(pPhyNode->pJLimit);
|
||||
nodesDestroyList(pPhyNode->pOnLeft);
|
||||
nodesDestroyList(pPhyNode->pOnRight);
|
||||
nodesDestroyNode(pPhyNode->leftPrimExpr);
|
||||
nodesDestroyNode(pPhyNode->rightPrimExpr);
|
||||
nodesDestroyNode(pPhyNode->pLeftOnCond);
|
||||
nodesDestroyNode(pPhyNode->pRightOnCond);
|
||||
nodesDestroyNode(pPhyNode->pFullOnCond);
|
||||
nodesDestroyList(pPhyNode->pTargets);
|
||||
|
||||
|
@ -1619,8 +1625,6 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
nodesDestroyNode(pPhyNode->pColEqCond);
|
||||
nodesDestroyNode(pPhyNode->pTagEqCond);
|
||||
|
||||
nodesDestroyNode(pPhyNode->pLeftOnCond);
|
||||
nodesDestroyNode(pPhyNode->pRightOnCond);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
|
||||
|
@ -1654,6 +1658,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
|
||||
|
@ -2059,6 +2064,8 @@ void* nodesGetValueFromNode(SValueNode* pNode) {
|
|||
|
||||
int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value) {
|
||||
switch (pNode->node.resType.type) {
|
||||
case TSDB_DATA_TYPE_NULL:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
pNode->datum.b = *(bool*)value;
|
||||
*(bool*)&pNode->typeData = pNode->datum.b;
|
||||
|
@ -2110,7 +2117,10 @@ int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value) {
|
|||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
case TSDB_DATA_TYPE_MEDIUMBLOB:
|
||||
case TSDB_DATA_TYPE_GEOMETRY:
|
||||
pNode->datum.p = (char*)value;
|
||||
break;
|
||||
|
|
|
@ -24,6 +24,18 @@ extern "C" {
|
|||
#include "tsimplehash.h"
|
||||
#include "taoserror.h"
|
||||
|
||||
|
||||
typedef struct SPhysiPlanContext {
|
||||
SPlanContext* pPlanCxt;
|
||||
int32_t errCode;
|
||||
int16_t nextDataBlockId;
|
||||
SArray* pLocationHelper;
|
||||
SArray* pProjIdxLocHelper;
|
||||
bool hasScan;
|
||||
bool hasSysScan;
|
||||
} SPhysiPlanContext;
|
||||
|
||||
|
||||
#define planFatal(param, ...) qFatal("PLAN: " param, ##__VA_ARGS__)
|
||||
#define planError(param, ...) qError("PLAN: " param, ##__VA_ARGS__)
|
||||
#define planWarn(param, ...) qWarn("PLAN: " param, ##__VA_ARGS__)
|
||||
|
|
|
@ -30,15 +30,6 @@ typedef struct SSlotIndex {
|
|||
SArray* pSlotIdsInfo; // duplicate name slot
|
||||
} SSlotIndex;
|
||||
|
||||
typedef struct SPhysiPlanContext {
|
||||
SPlanContext* pPlanCxt;
|
||||
int32_t errCode;
|
||||
int16_t nextDataBlockId;
|
||||
SArray* pLocationHelper;
|
||||
SArray* pProjIdxLocHelper;
|
||||
bool hasScan;
|
||||
bool hasSysScan;
|
||||
} SPhysiPlanContext;
|
||||
|
||||
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char** ppKey, int32_t *pLen, uint16_t extraBufLen) {
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -62,6 +62,7 @@ typedef enum {
|
|||
#define SCH_DEFAULT_MAX_RETRY_NUM 6
|
||||
#define SCH_MIN_AYSNC_EXEC_NUM 3
|
||||
#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3
|
||||
#define SCH_DEFAULT_TASK_CAPACITY_NUM 1000
|
||||
|
||||
typedef struct SSchDebug {
|
||||
bool lockEnable;
|
||||
|
@ -318,6 +319,8 @@ typedef struct SSchTaskCtx {
|
|||
|
||||
extern SSchedulerMgmt schMgmt;
|
||||
|
||||
#define SCH_GET_TASK_CAPACITY(_n) ((_n) > SCH_DEFAULT_TASK_CAPACITY_NUM ? SCH_DEFAULT_TASK_CAPACITY_NUM : (_n))
|
||||
|
||||
#define SCH_TASK_TIMEOUT(_task) \
|
||||
((taosGetTimestampUs() - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
|
||||
|
||||
|
@ -330,8 +333,8 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
|
||||
|
||||
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
|
||||
#define SCH_IS_DATA_BIND_TASK(task) \
|
||||
(((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
|
||||
#define SCH_IS_DATA_BIND_PLAN(_plan) (((_plan)->subplanType == SUBPLAN_TYPE_SCAN) || ((_plan)->subplanType == SUBPLAN_TYPE_MODIFY))
|
||||
#define SCH_IS_DATA_BIND_TASK(task) SCH_IS_DATA_BIND_PLAN((task)->plan)
|
||||
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
|
||||
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
|
||||
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) \
|
||||
|
@ -641,6 +644,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
|
|||
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask);
|
||||
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
|
||||
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
|
||||
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum);
|
||||
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
|
||||
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
|
||||
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);
|
||||
|
|
|
@ -200,7 +200,12 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
|
||||
if (NULL == child) {
|
||||
SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(child)) {
|
||||
SCH_JOB_ELOG("invalid subplan type for the %dth child, level:%d, subplanNodeType:%d", n, i, nodeType(child));
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
|
||||
|
@ -242,6 +247,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
|
|||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(parent)) {
|
||||
SCH_JOB_ELOG("invalid subplan type for the %dth parent, level:%d, subplanNodeType:%d", n, i, nodeType(parent));
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
|
||||
if (NULL == parentTask || NULL == *parentTask) {
|
||||
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
|
||||
|
@ -307,7 +317,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
|
||||
pJob->dataSrcTasks = taosArrayInit(SCH_GET_TASK_CAPACITY(pDag->numOfSubplans), POINTER_BYTES);
|
||||
if (NULL == pJob->dataSrcTasks) {
|
||||
SCH_ERR_RET(terrno);
|
||||
}
|
||||
|
@ -319,12 +329,12 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
}
|
||||
|
||||
SHashObj *planToTask = taosHashInit(
|
||||
pDag->numOfSubplans,
|
||||
SCH_GET_TASK_CAPACITY(pDag->numOfSubplans),
|
||||
taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
|
||||
HASH_NO_LOCK);
|
||||
if (NULL == planToTask) {
|
||||
SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
|
||||
SCH_ERR_RET(terrno);
|
||||
SCH_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
|
||||
|
@ -339,6 +349,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SSchLevel level = {0};
|
||||
SNodeListNode *plans = NULL;
|
||||
int32_t taskNum = 0;
|
||||
int32_t totalTaskNum = 0;
|
||||
SSchLevel *pLevel = NULL;
|
||||
|
||||
level.status = JOB_TASK_STATUS_INIT;
|
||||
|
@ -352,7 +363,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
pLevel = taosArrayGet(pJob->levels, i);
|
||||
if (NULL == pLevel) {
|
||||
SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pLevel->level = i;
|
||||
|
@ -363,12 +374,23 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (QUERY_NODE_NODE_LIST != nodeType(plans)) {
|
||||
SCH_JOB_ELOG("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans));
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
|
||||
if (taskNum <= 0) {
|
||||
SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
totalTaskNum += taskNum;
|
||||
if (totalTaskNum > pDag->numOfSubplans) {
|
||||
SCH_JOB_ELOG("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pLevel->taskNum = taskNum;
|
||||
|
||||
pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
|
||||
|
@ -379,11 +401,9 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
|
||||
for (int32_t n = 0; n < taskNum; ++n) {
|
||||
SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
|
||||
if (NULL == plan) {
|
||||
SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d", n, taskNum);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schValidateSubplan(pJob, plan, pLevel->level, n, taskNum));
|
||||
|
||||
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
|
||||
|
||||
SSchTask task = {0};
|
||||
|
@ -397,14 +417,16 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
|
||||
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
|
||||
|
||||
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
|
||||
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
|
||||
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
code = taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES);
|
||||
if (0 != code) {
|
||||
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d, error:%s", n, tstrerror(code));
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
|
||||
SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
|
||||
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
code = taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
|
||||
if (0 != code) {
|
||||
SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d, error:%s", n, tstrerror(code));
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
++pJob->taskNum;
|
||||
|
@ -413,6 +435,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
|
||||
}
|
||||
|
||||
if (totalTaskNum != pDag->numOfSubplans) {
|
||||
SCH_JOB_ELOG("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
|
||||
|
||||
_return:
|
||||
|
@ -781,9 +808,11 @@ void schFreeJobImpl(void *job) {
|
|||
}
|
||||
taosMemoryFree(pJob);
|
||||
|
||||
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||
if (jobNum == 0) {
|
||||
schCloseJobRef();
|
||||
if (refId > 0) {
|
||||
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||
if (jobNum == 0) {
|
||||
schCloseJobRef();
|
||||
}
|
||||
}
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
|
||||
|
@ -861,10 +890,10 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
|
||||
pJob->taskList = taosHashInit(SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans), taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
|
||||
HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->taskList) {
|
||||
SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
|
||||
SCH_JOB_ELOG("taosHashInit %d taskList failed", SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans));
|
||||
SCH_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
|
@ -904,7 +933,7 @@ _return:
|
|||
|
||||
if (NULL == pJob) {
|
||||
qDestroyQueryPlan(pReq->pDag);
|
||||
} else if (pJob->refId < 0) {
|
||||
} else if (pJob->refId <= 0) {
|
||||
schFreeJobImpl(pJob);
|
||||
} else {
|
||||
code = taosRemoveRef(schMgmt.jobRef, pJob->refId);
|
||||
|
|
|
@ -831,7 +831,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
|
||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||
SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
|
||||
SCH_ERR_RET(TSDB_CODE_MND_INVALID_SCHEMA_VER);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
|
||||
|
|
|
@ -360,3 +360,50 @@ void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask)
|
|||
|
||||
*pTask = *task;
|
||||
}
|
||||
|
||||
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum) {
|
||||
if (NULL == pSubplan) {
|
||||
SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d, level: %d", idx, taskNum, level);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(pSubplan)) {
|
||||
SCH_JOB_ELOG("invalid subplan type, level:%d, subplanNodeType:%d", level, nodeType(pSubplan));
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (pSubplan->subplanType < SUBPLAN_TYPE_MERGE || pSubplan->subplanType > SUBPLAN_TYPE_COMPUTE) {
|
||||
SCH_JOB_ELOG("invalid subplanType %d, level:%d, subplan idx:%d", pSubplan->subplanType, level, idx);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (pSubplan->level != level) {
|
||||
SCH_JOB_ELOG("plan level %d mis-match with current level %d", pSubplan->level, level);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (SCH_IS_DATA_BIND_PLAN(pSubplan)) {
|
||||
if (pSubplan->execNode.epSet.numOfEps <= 0) {
|
||||
SCH_JOB_ELOG("no execNode specifed for data src plan %d, numOfEps:%d", pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_DATA_SRC_EP_MISS);
|
||||
}
|
||||
if (pSubplan->execNode.epSet.inUse >= pSubplan->execNode.epSet.numOfEps) {
|
||||
SCH_JOB_ELOG("invalid epset inUse %d for data src plan %d, numOfEps:%d", pSubplan->execNode.epSet.inUse, pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pSubplan->pNode && pSubplan->subplanType != SUBPLAN_TYPE_MODIFY) {
|
||||
SCH_JOB_ELOG("empty plan root node, level:%d, subplan idx:%d, subplanType:%d", level, idx, pSubplan->subplanType);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (NULL == pSubplan->pDataSink) {
|
||||
SCH_JOB_ELOG("empty plan dataSink, level:%d, subplan idx:%d", level, idx);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -224,3 +224,33 @@ void schedulerDestroy(void) {
|
|||
qWorkerDestroy(&schMgmt.queryMgmt);
|
||||
schMgmt.queryMgmt = NULL;
|
||||
}
|
||||
|
||||
int32_t schedulerValidatePlan(SQueryPlan* pPlan) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
|
||||
if (NULL == pJob) {
|
||||
qError("QID:0x%" PRIx64 " calloc %d failed", pPlan->queryId, (int32_t)sizeof(SSchJob));
|
||||
SCH_ERR_RET(terrno);
|
||||
}
|
||||
|
||||
pJob->taskList = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
|
||||
HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->taskList) {
|
||||
SCH_JOB_ELOG("taosHashInit %d taskList failed", 100);
|
||||
SCH_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schValidateAndBuildJob(pPlan, pJob));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_JRET(qExecExplainBegin(pPlan, &pJob->explainCtx, 0));
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
schFreeJobImpl(pJob);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -203,6 +203,10 @@ void schtBuildQueryDag(SQueryPlan *dag) {
|
|||
return;
|
||||
}
|
||||
scanPlan->msgType = TDMT_SCH_QUERY;
|
||||
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&scanPlan->pDataSink);
|
||||
if (NULL == scanPlan->pDataSink) {
|
||||
return;
|
||||
}
|
||||
|
||||
mergePlan->id.queryId = qId;
|
||||
mergePlan->id.groupId = schtMergeTemplateId;
|
||||
|
@ -223,6 +227,10 @@ void schtBuildQueryDag(SQueryPlan *dag) {
|
|||
return;
|
||||
}
|
||||
mergePlan->msgType = TDMT_SCH_QUERY;
|
||||
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&mergePlan->pDataSink);
|
||||
if (NULL == mergePlan->pDataSink) {
|
||||
return;
|
||||
}
|
||||
|
||||
merge->pNodeList = NULL;
|
||||
code = nodesMakeList(&merge->pNodeList);
|
||||
|
@ -235,6 +243,7 @@ void schtBuildQueryDag(SQueryPlan *dag) {
|
|||
return;
|
||||
}
|
||||
|
||||
|
||||
(void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan);
|
||||
(void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan);
|
||||
|
||||
|
@ -250,7 +259,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
|
|||
int32_t scanPlanNum = 20;
|
||||
|
||||
dag->queryId = qId;
|
||||
dag->numOfSubplans = 2;
|
||||
dag->numOfSubplans = scanPlanNum + 1;
|
||||
dag->pSubplans = NULL;
|
||||
int32_t code = nodesMakeList(&dag->pSubplans);
|
||||
if (NULL == dag->pSubplans) {
|
||||
|
@ -289,6 +298,10 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
|
|||
if (NULL == mergePlan->pChildren) {
|
||||
return;
|
||||
}
|
||||
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&mergePlan->pDataSink);
|
||||
if (NULL == mergePlan->pDataSink) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < scanPlanNum; ++i) {
|
||||
SSubplan *scanPlan = NULL;
|
||||
|
@ -322,6 +335,10 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) {
|
|||
return;
|
||||
}
|
||||
scanPlan->msgType = TDMT_SCH_QUERY;
|
||||
code = nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_DISPATCH, (SNode**)&scanPlan->pDataSink);
|
||||
if (NULL == scanPlan->pDataSink) {
|
||||
return;
|
||||
}
|
||||
|
||||
(void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan);
|
||||
(void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan);
|
||||
|
|
|
@ -618,6 +618,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal er
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_TIMEOUT_ERROR, "Task timeout")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_JOB_IS_DROPPING, "Job is dropping")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_JOB_NOT_EXISTS, "Job no longer exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_DATA_SRC_EP_MISS, "No valid epSet for data source node")
|
||||
|
||||
// parser
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYNTAX_ERROR, "syntax error near")
|
||||
|
|
Loading…
Reference in New Issue