fix: add input ignore group

This commit is contained in:
shenglian zhou 2024-01-16 08:13:47 +08:00
parent 8ae3aa24be
commit 9c9d9dc24b
7 changed files with 37 additions and 15 deletions

View File

@ -448,7 +448,7 @@ typedef struct SProjectPhysiNode {
SNodeList* pProjections;
bool mergeDataBlock;
bool ignoreGroupId;
bool inputIgnoreGroupId;
bool inputIgnoreGroup;
} SProjectPhysiNode;
typedef struct SIndefRowsFuncPhysiNode {

View File

@ -27,7 +27,7 @@ typedef struct SProjectOperatorInfo {
SLimitInfo limitInfo;
bool mergeDataBlocks;
SSDataBlock* pFinalRes;
bool inputIgnoreGroupId;
bool inputIgnoreGroup;
} SProjectOperatorInfo;
typedef struct SIndefOperatorInfo {
@ -110,7 +110,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pInfo->mergeDataBlocks = false;
} else {
@ -300,8 +301,8 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pBlock->info.type == STREAM_CHECKPOINT) {
return pBlock;
}
if (pProjectInfo->inputIgnoreGroupId) {
if (pProjectInfo->inputIgnoreGroup) {
pBlock->info.id.groupId = 0;
}

View File

@ -462,6 +462,7 @@ static int32_t logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode
CLONE_NODE_LIST_FIELD(pProjections);
COPY_CHAR_ARRAY_FIELD(stmtName);
COPY_SCALAR_FIELD(ignoreGroupId);
COPY_SCALAR_FIELD(inputIgnoreGroup);
return TSDB_CODE_SUCCESS;
}

View File

@ -784,6 +784,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
static const char* jkProjectLogicPlanProjections = "Projections";
static const char* jkProjectLogicPlanIgnoreGroupId = "IgnoreGroupId";
static const char* jkProjectLogicPlanInputIgnoreGroup= "InputIgnoreGroup";
static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
const SProjectLogicNode* pNode = (const SProjectLogicNode*)pObj;
@ -795,6 +796,9 @@ static int32_t logicProjectNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkProjectLogicPlanIgnoreGroupId, pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkProjectLogicPlanInputIgnoreGroup, pNode->inputIgnoreGroup);
}
return code;
}
@ -809,7 +813,9 @@ static int32_t jsonToLogicProjectNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkProjectLogicPlanIgnoreGroupId, &pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkProjectLogicPlanInputIgnoreGroup, &pNode->inputIgnoreGroup);
}
return code;
}
@ -2043,6 +2049,7 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
static const char* jkProjectPhysiPlanProjections = "Projections";
static const char* jkProjectPhysiPlanMergeDataBlock = "MergeDataBlock";
static const char* jkProjectPhysiPlanIgnoreGroupId = "IgnoreGroupId";
static const char* jkProjectPhysiPlanInputIgnoreGroup = "InputIgnoreGroup";
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj;
@ -2057,7 +2064,9 @@ static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkProjectPhysiPlanIgnoreGroupId, pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkProjectPhysiPlanInputIgnoreGroup, pNode->inputIgnoreGroup);
}
return code;
}
@ -2074,7 +2083,9 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkProjectPhysiPlanIgnoreGroupId, &pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkProjectPhysiPlanInputIgnoreGroup, &pNode->inputIgnoreGroup);
}
return code;
}

View File

@ -2355,7 +2355,8 @@ enum {
PHY_PROJECT_CODE_BASE_NODE = 1,
PHY_PROJECT_CODE_PROJECTIONS,
PHY_PROJECT_CODE_MERGE_DATA_BLOCK,
PHY_PROJECT_CODE_IGNORE_GROUP_ID
PHY_PROJECT_CODE_IGNORE_GROUP_ID,
PHY_PROJECT_CODE_INPUT_IGNORE_GROUP
};
static int32_t physiProjectNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2371,6 +2372,9 @@ static int32_t physiProjectNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_PROJECT_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_PROJECT_CODE_INPUT_IGNORE_GROUP, pNode->inputIgnoreGroup);
}
return code;
}
@ -2394,6 +2398,9 @@ static int32_t msgToPhysiProjectNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_PROJECT_CODE_IGNORE_GROUP_ID:
code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId);
break;
case PHY_PROJECT_CODE_INPUT_IGNORE_GROUP:
code = tlvDecodeBool(pTlv, &pNode->inputIgnoreGroup);
break;
default:
break;
}

View File

@ -2980,10 +2980,7 @@ static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
NULL != pChild->pConditions || NULL != pChild->pLimit || NULL != pChild->pSlimit) {
return false;
}
if (false == ((SProjectLogicNode*)pChild)->ignoreGroupId) {
qError("internal error, child project output does not ignore group id");
return false;
}
return true;
}
@ -3022,7 +3019,12 @@ static EDealRes mergeProjectionsExpr(SNode** pNode, void* pContext) {
static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SLogicNode* pSelfNode) {
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSelfNode->pChildren, 0);
if (false == ((SProjectLogicNode*)pChild)->ignoreGroupId) {
qError("internal error, child project output does not ignore group when merge projects");
return TSDB_CODE_PLAN_INTERNAL_ERROR;
} else {
((SProjectLogicNode*)pSelfNode)->inputIgnoreGroup = true;
}
SMergeProjectionsContext cxt = {.pChildProj = (SProjectLogicNode*)pChild, .errCode = TSDB_CODE_SUCCESS};
nodesRewriteExprs(((SProjectLogicNode*)pSelfNode)->pProjections, mergeProjectionsExpr, &cxt);
int32_t code = cxt.errCode;
@ -3040,7 +3042,6 @@ static int32_t mergeProjectsOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
NODES_CLEAR_LIST(pChild->pChildren);
}
nodesDestroyNode((SNode*)pChild);
((SProjectLogicNode*)pSelfNode)->inputIgnoreGroup = true;
pCxt->optimized = true;
return code;
}

View File

@ -1432,6 +1432,7 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild
pProject->mergeDataBlock = projectCanMergeDataBlock(pProjectLogicNode);
pProject->ignoreGroupId = pProjectLogicNode->ignoreGroupId;
pProject->inputIgnoreGroup = pProjectLogicNode->inputIgnoreGroup;
int32_t code = TSDB_CODE_SUCCESS;
if (0 == LIST_LENGTH(pChildren)) {