feature/qnode

This commit is contained in:
dapan 2022-01-09 18:08:04 +08:00
parent e5834b8e00
commit 7791d68d48
3 changed files with 6 additions and 2 deletions

View File

@ -3952,7 +3952,8 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
pQueryInfo->pTableMetaInfo[0]->pTableMeta = pmt; pQueryInfo->pTableMetaInfo[0]->pTableMeta = pmt;
pQueryInfo->pTableMetaInfo[0]->name = *name; pQueryInfo->pTableMetaInfo[0]->name = *name;
pQueryInfo->numOfTables = 1; pQueryInfo->numOfTables = 1;
pQueryInfo->pTableMetaInfo[0]->tagColList = taosArrayInit(4, POINTER_BYTES);
code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList); code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(data.pTableMeta); taosArrayDestroy(data.pTableMeta);

View File

@ -237,6 +237,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt); STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
subplan->msgType = TDMT_VND_QUERY;
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode); vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable); subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode); subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);

View File

@ -23,6 +23,7 @@ static SSchedulerMgmt schMgmt = {0};
int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) { int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) {
int32_t code = 0; int32_t code = 0;
/*
if (oriStatus == newStatus) { if (oriStatus == newStatus) {
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
@ -77,6 +78,7 @@ int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) {
qError("invalid task status:%d", oriStatus); qError("invalid task status:%d", oriStatus);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
*/
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -539,7 +541,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); SCH_ERR_RET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
if (!needRetry) { if (!needRetry) {
SCH_TASK_ELOG("task failed[%x], no more retry", errCode); SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode);
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved)); SCH_ERR_RET(schMoveTaskToFailList(pJob, pTask, &moved));