fix: scheduler validate plan issues
This commit is contained in:
parent
a2665ebc82
commit
e52628ab57
|
@ -58,6 +58,8 @@ enum {
|
|||
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA)
|
||||
#define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_BATCH_META)
|
||||
|
||||
#define TSC_MAX_SUBPLAN_CAPACITY_NUM 1000
|
||||
|
||||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1251,6 +1251,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
|||
|
||||
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
|
||||
int32_t code = 0;
|
||||
int32_t subplanNum = 0;
|
||||
|
||||
if (pQuery->pRoot) {
|
||||
pRequest->stmtType = pQuery->pRoot->type;
|
||||
|
@ -1292,7 +1293,7 @@ void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void
|
|||
SQueryPlan* pDag = NULL;
|
||||
code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pRequest->body.subplanNum = pDag->numOfSubplans;
|
||||
subplanNum = pDag->numOfSubplans;
|
||||
if (!pRequest->validateOnly) {
|
||||
SArray* pNodeList = NULL;
|
||||
code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
|
||||
|
@ -1301,6 +1302,10 @@ void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void
|
|||
}
|
||||
taosArrayDestroy(pNodeList);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pRequest->body.subplanNum = subplanNum;
|
||||
}
|
||||
}
|
||||
taosArrayDestroy(pMnodeList);
|
||||
break;
|
||||
|
@ -1343,6 +1348,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
|||
pRequest->type = pQuery->msgType;
|
||||
SArray* pMnodeList = NULL;
|
||||
SQueryPlan* pDag = NULL;
|
||||
int32_t subplanNum = 0;
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
if (!pRequest->parseOnly) {
|
||||
|
@ -1369,7 +1375,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
|||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||
pRequest->requestId);
|
||||
} else {
|
||||
pRequest->body.subplanNum = pDag->numOfSubplans;
|
||||
subplanNum = pDag->numOfSubplans;
|
||||
TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
|
||||
}
|
||||
}
|
||||
|
@ -1406,6 +1412,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pRequest->body.subplanNum = subplanNum;
|
||||
}
|
||||
taosArrayDestroy(pNodeList);
|
||||
} else {
|
||||
qDestroyQueryPlan(pDag);
|
||||
|
|
|
@ -47,9 +47,9 @@
|
|||
|
||||
namespace {
|
||||
|
||||
#define QPT_MAX_LOOP 1000
|
||||
#define QPT_MAX_LOOP 100000
|
||||
#define QPT_MAX_LEVEL_SUBPLAN_NUM 10
|
||||
#define QPT_MAX_SUBPLAN_LEVEL 5
|
||||
#define QPT_MAX_SUBPLAN_LEVEL 2
|
||||
#define QPT_MAX_SUBPLAN_GROUP 5
|
||||
#define QPT_MAX_WHEN_THEN_NUM 10
|
||||
#define QPT_MAX_NODE_LEVEL 5
|
||||
|
@ -172,7 +172,10 @@ typedef struct {
|
|||
} SQPTMakeNodeCtx;
|
||||
|
||||
typedef struct {
|
||||
int64_t startTsUs;
|
||||
int32_t code;
|
||||
int64_t succeedTimes;
|
||||
int64_t failedTimes;
|
||||
} SQPTExecResult;
|
||||
|
||||
typedef struct {
|
||||
|
@ -183,7 +186,6 @@ typedef struct {
|
|||
SQPTMakeNodeCtx makeCtx;
|
||||
SQPTMakeNodeCtx makeCtxBak;
|
||||
SQPTExecResult result;
|
||||
int64_t startTsUs;
|
||||
} SQPTCtx;
|
||||
|
||||
typedef struct {
|
||||
|
@ -257,7 +259,7 @@ SNode* qptCreateSubplanNode(int32_t nodeType);
|
|||
SQPTPlan qptPlans[] = {
|
||||
{QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, QPT_PLAN_PHYSIC, "tagScan", qptCreateTagScanPhysiNode},
|
||||
{QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, QPT_PLAN_PHYSIC, "tableScan", qptCreateTableScanPhysiNode},
|
||||
{QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QPT_PLAN_PHYSIC, "tableSeqScan", NULL /*qptCreateTableSeqScanPhysiNode*/ },
|
||||
{QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QPT_PLAN_PHYSIC, "tableSeqScan", qptCreateTableSeqScanPhysiNode},
|
||||
{QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, QPT_PLAN_PHYSIC, "tableMergeScan", qptCreateTableMergeScanPhysiNode},
|
||||
{QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QPT_PLAN_PHYSIC, "streamScan", qptCreateStreamScanPhysiNode},
|
||||
{QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QPT_PLAN_PHYSIC, "sysTableScan", qptCreateSysTableScanPhysiNode},
|
||||
|
@ -329,6 +331,16 @@ int32_t qptSink[] = {QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN
|
|||
SNode* qptMakeExprNode(SNode** ppNode);
|
||||
void qptMakeNodeList(QPT_NODE_TYPE nodeType, SNodeList** ppList);
|
||||
|
||||
int32_t qptGetSpecificPlanIndex(int32_t type) {
|
||||
int32_t planNum = sizeof(qptPlans) / sizeof(qptPlans[0]);
|
||||
for (int32_t i = 0; i < planNum; ++i) {
|
||||
if (qptPlans[i].type == type) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t qptGetColumnRandLen(int32_t colType) {
|
||||
switch (colType) {
|
||||
|
@ -460,7 +472,8 @@ void qptPrintEndInfo() {
|
|||
}
|
||||
|
||||
void qptPrintStatInfo() {
|
||||
|
||||
printf("\n\tAll %d times TEST [%s] END, result - succeed:%" PRId64 " failed:%" PRId64 "\n", qptCtx.loopIdx + 1, qptCtx.caseName,
|
||||
qptCtx.result.succeedTimes, qptCtx.result.failedTimes);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1661,6 +1674,19 @@ SNode* qptMakeOrderByExprNode(SNode** ppNode) {
|
|||
return *ppNode;
|
||||
}
|
||||
|
||||
SNode* qptMakeSubplanNode(SNode** ppNode) {
|
||||
if (QPT_NCORRECT_LOW_PROB()) {
|
||||
return qptMakeRandNode(ppNode);
|
||||
}
|
||||
|
||||
*ppNode = (SNode*)qptCreateSubplanNode(QUERY_NODE_PHYSICAL_SUBPLAN);
|
||||
|
||||
return *ppNode;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
SPhysiNode* qptCreatePhysiNode(int32_t nodeType) {
|
||||
SPhysiNode* pPhysiNode = NULL;
|
||||
assert(0 == nodesMakeNode((ENodeType)nodeType, (SNode**)&pPhysiNode));
|
||||
|
@ -1928,6 +1954,18 @@ void qptMakeOrerByExprList(SNodeList** ppList) {
|
|||
}
|
||||
|
||||
|
||||
void qptMakeSubplanList(SNodeList** ppList) {
|
||||
qptSaveMakeNodeCtx();
|
||||
|
||||
int32_t planNum = taosRand() % QPT_MAX_LEVEL_SUBPLAN_NUM + (QPT_CORRECT_HIGH_PROB() ? 1 : 0);
|
||||
for (int32_t i = 0; i < planNum; ++i) {
|
||||
SNode* pNode = NULL;
|
||||
qptRestoreMakeNodeCtx();
|
||||
qptMakeSubplanNode(&pNode);
|
||||
qptNodesListMakeStrictAppend(ppList, pNode);
|
||||
}
|
||||
}
|
||||
|
||||
void qptMakeSpecTypeNodeList(QPT_NODE_TYPE nodeType, SNodeList** ppList) {
|
||||
switch (nodeType) {
|
||||
case QPT_NODE_COLUMN:
|
||||
|
@ -1938,6 +1976,8 @@ void qptMakeSpecTypeNodeList(QPT_NODE_TYPE nodeType, SNodeList** ppList) {
|
|||
return qptMakeExprList(ppList);
|
||||
case QPT_NODE_VALUE:
|
||||
return qptMakeValueList(ppList);
|
||||
case QPT_NODE_SUBPLAN:
|
||||
return qptMakeSubplanList(ppList);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -2934,6 +2974,8 @@ void qptResetForReRun() {
|
|||
|
||||
qptCtx.buildCtx.pCurr = NULL;
|
||||
qptCtx.buildCtx.pCurrTask = NULL;
|
||||
|
||||
qptCtx.result.code = 0;
|
||||
}
|
||||
|
||||
void qptSingleTestDone(bool* contLoop) {
|
||||
|
@ -3126,6 +3168,13 @@ void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInf
|
|||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
if (qptCtx.result.code) {
|
||||
qptCtx.result.failedTimes++;
|
||||
} else {
|
||||
qptCtx.result.succeedTimes++;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void qptRunSingleOpTest() {
|
||||
|
@ -3144,7 +3193,7 @@ void qptRunSingleOpTest() {
|
|||
|
||||
qptPrintBeginInfo();
|
||||
|
||||
qptCtx.startTsUs = taosGetTimestampUs();
|
||||
qptCtx.result.startTsUs = taosGetTimestampUs();
|
||||
|
||||
qptExecPlan(&readHandle, pNode, pTaskInfo, &pOperator);
|
||||
|
||||
|
@ -3170,7 +3219,7 @@ void qptRunSubplanTest() {
|
|||
|
||||
qptPrintBeginInfo();
|
||||
|
||||
qptCtx.startTsUs = taosGetTimestampUs();
|
||||
qptCtx.result.startTsUs = taosGetTimestampUs();
|
||||
//qptCtx.result.code = createTagScanOperatorInfo(&readHandle, (STagScanPhysiNode*)pNode, NULL, NULL, NULL, NULL, &pOperator);
|
||||
//qptCtx.result.code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pNode, NULL, &pOperator);
|
||||
|
||||
|
@ -3307,9 +3356,9 @@ void qptDestroyTestCtx() {
|
|||
} // namespace
|
||||
|
||||
#if 1
|
||||
#if 1
|
||||
TEST(singleNodeTest, randPlan) {
|
||||
char* caseType = "singleNodeTest:randPlan";
|
||||
#if 0
|
||||
TEST(singleRandNodeTest, loopPlans) {
|
||||
char* caseType = "singleRandNodeTest:loopPlans";
|
||||
|
||||
for (qptCtx.loopIdx = 0; qptCtx.loopIdx < QPT_MAX_LOOP; ++qptCtx.loopIdx) {
|
||||
for (int32_t i = 0; i < sizeof(qptPlans)/sizeof(qptPlans[0]); ++i) {
|
||||
|
@ -3325,6 +3374,24 @@ TEST(singleNodeTest, randPlan) {
|
|||
qptPrintStatInfo();
|
||||
}
|
||||
#endif
|
||||
#if 1
|
||||
TEST(singleRandNodeTest, specificPlan) {
|
||||
char* caseType = "singleRandNodeTest:specificPlan";
|
||||
|
||||
int32_t idx = qptGetSpecificPlanIndex(QUERY_NODE_PHYSICAL_PLAN);
|
||||
for (qptCtx.loopIdx = 0; qptCtx.loopIdx < QPT_MAX_LOOP; ++qptCtx.loopIdx) {
|
||||
sprintf(qptCtx.caseName, "%s:%s", caseType, qptPlans[idx].name);
|
||||
qptInitTestCtx(false, true, qptPlans[idx].type, idx, 0, NULL);
|
||||
|
||||
qptRunPlanTest();
|
||||
|
||||
qptDestroyTestCtx();
|
||||
}
|
||||
|
||||
qptPrintStatInfo();
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
|
|
@ -62,6 +62,7 @@ typedef enum {
|
|||
#define SCH_DEFAULT_MAX_RETRY_NUM 6
|
||||
#define SCH_MIN_AYSNC_EXEC_NUM 3
|
||||
#define SCH_DEFAULT_RETRY_TOTAL_ROUND 3
|
||||
#define SCH_DEFAULT_TASK_CAPACITY_NUM 1000
|
||||
|
||||
typedef struct SSchDebug {
|
||||
bool lockEnable;
|
||||
|
@ -318,6 +319,8 @@ typedef struct SSchTaskCtx {
|
|||
|
||||
extern SSchedulerMgmt schMgmt;
|
||||
|
||||
#define SCH_GET_TASK_CAPACITY(_n) ((_n) > SCH_DEFAULT_TASK_CAPACITY_NUM ? SCH_DEFAULT_TASK_CAPACITY_NUM : (_n))
|
||||
|
||||
#define SCH_TASK_TIMEOUT(_task) \
|
||||
((taosGetTimestampUs() - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec)
|
||||
|
||||
|
|
|
@ -317,7 +317,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES);
|
||||
pJob->dataSrcTasks = taosArrayInit(SCH_GET_TASK_CAPACITY(pDag->numOfSubplans), POINTER_BYTES);
|
||||
if (NULL == pJob->dataSrcTasks) {
|
||||
SCH_ERR_RET(terrno);
|
||||
}
|
||||
|
@ -329,7 +329,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
}
|
||||
|
||||
SHashObj *planToTask = taosHashInit(
|
||||
pDag->numOfSubplans,
|
||||
SCH_GET_TASK_CAPACITY(pDag->numOfSubplans),
|
||||
taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false,
|
||||
HASH_NO_LOCK);
|
||||
if (NULL == planToTask) {
|
||||
|
@ -349,6 +349,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SSchLevel level = {0};
|
||||
SNodeListNode *plans = NULL;
|
||||
int32_t taskNum = 0;
|
||||
int32_t totalTaskNum = 0;
|
||||
SSchLevel *pLevel = NULL;
|
||||
|
||||
level.status = JOB_TASK_STATUS_INIT;
|
||||
|
@ -362,7 +363,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
pLevel = taosArrayGet(pJob->levels, i);
|
||||
if (NULL == pLevel) {
|
||||
SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pLevel->level = i;
|
||||
|
@ -384,6 +385,12 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
totalTaskNum += taskNum;
|
||||
if (totalTaskNum > pDag->numOfSubplans) {
|
||||
SCH_JOB_ELOG("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pLevel->taskNum = taskNum;
|
||||
|
||||
pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
|
||||
|
@ -410,14 +417,16 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
|
||||
SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask));
|
||||
|
||||
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) {
|
||||
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n);
|
||||
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
code = taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES);
|
||||
if (0 != code) {
|
||||
SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d, error:%s", n, tstrerror(code));
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
|
||||
SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n);
|
||||
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
code = taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
|
||||
if (0 != code) {
|
||||
SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d, error:%s", n, tstrerror(code));
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
++pJob->taskNum;
|
||||
|
@ -426,6 +435,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum);
|
||||
}
|
||||
|
||||
if (totalTaskNum != pDag->numOfSubplans) {
|
||||
SCH_JOB_ELOG("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
|
||||
|
||||
_return:
|
||||
|
@ -876,10 +890,10 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
|
||||
pJob->taskList = taosHashInit(SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans), taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
|
||||
HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->taskList) {
|
||||
SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
|
||||
SCH_JOB_ELOG("taosHashInit %d taskList failed", SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans));
|
||||
SCH_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
|
|
|
@ -233,6 +233,13 @@ int32_t schedulerValidatePlan(SQueryPlan* pPlan) {
|
|||
SCH_ERR_RET(terrno);
|
||||
}
|
||||
|
||||
pJob->taskList = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
|
||||
HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->taskList) {
|
||||
SCH_JOB_ELOG("taosHashInit %d taskList failed", 100);
|
||||
SCH_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schValidateAndBuildJob(pPlan, pJob));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
|
|
Loading…
Reference in New Issue