TD-12678 qSetSubplanExecutionNode interface implement
This commit is contained in:
commit
6271f002ee
|
@ -3952,7 +3952,8 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
|
||||||
pQueryInfo->pTableMetaInfo[0]->pTableMeta = pmt;
|
pQueryInfo->pTableMetaInfo[0]->pTableMeta = pmt;
|
||||||
pQueryInfo->pTableMetaInfo[0]->name = *name;
|
pQueryInfo->pTableMetaInfo[0]->name = *name;
|
||||||
pQueryInfo->numOfTables = 1;
|
pQueryInfo->numOfTables = 1;
|
||||||
|
pQueryInfo->pTableMetaInfo[0]->tagColList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList);
|
code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosArrayDestroy(data.pTableMeta);
|
taosArrayDestroy(data.pTableMeta);
|
||||||
|
|
|
@ -257,6 +257,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
|
||||||
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
|
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
|
||||||
STORE_CURRENT_SUBPLAN(pCxt);
|
STORE_CURRENT_SUBPLAN(pCxt);
|
||||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
|
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
|
||||||
|
subplan->msgType = TDMT_VND_QUERY;
|
||||||
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
|
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
|
||||||
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
|
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
|
||||||
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
|
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
|
||||||
|
|
|
@ -23,6 +23,7 @@ static SSchedulerMgmt schMgmt = {0};
|
||||||
int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) {
|
int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
if (oriStatus == newStatus) {
|
if (oriStatus == newStatus) {
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
@ -77,6 +78,7 @@ int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) {
|
||||||
qError("invalid task status:%d", oriStatus);
|
qError("invalid task status:%d", oriStatus);
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -461,6 +463,7 @@ int32_t schProcessOnDataFetched(SSchJob *job) {
|
||||||
|
|
||||||
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
bool moved = false;
|
bool moved = false;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
SCH_ERR_RET(schMoveTaskToSuccList(pJob, pTask, &moved));
|
SCH_ERR_RET(schMoveTaskToSuccList(pJob, pTask, &moved));
|
||||||
if (!moved) {
|
if (!moved) {
|
||||||
|
@ -539,7 +542,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
|
||||||
SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
|
SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
|
||||||
|
|
||||||
if (!needRetry) {
|
if (!needRetry) {
|
||||||
SCH_TASK_ELOG("task failed[%x], no more retry", errCode);
|
SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode);
|
||||||
|
|
||||||
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
|
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
|
||||||
SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved));
|
SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved));
|
||||||
|
|
Loading…
Reference in New Issue