enh: group by tbname optimize
This commit is contained in:
parent
59ea486b94
commit
70164f9f20
|
@ -171,6 +171,7 @@ typedef struct SExchangeLogicNode {
|
||||||
SLogicNode node;
|
SLogicNode node;
|
||||||
int32_t srcStartGroupId;
|
int32_t srcStartGroupId;
|
||||||
int32_t srcEndGroupId;
|
int32_t srcEndGroupId;
|
||||||
|
bool seqRecvData;
|
||||||
} SExchangeLogicNode;
|
} SExchangeLogicNode;
|
||||||
|
|
||||||
typedef struct SMergeLogicNode {
|
typedef struct SMergeLogicNode {
|
||||||
|
@ -416,6 +417,7 @@ typedef struct SExchangePhysiNode {
|
||||||
int32_t srcEndGroupId;
|
int32_t srcEndGroupId;
|
||||||
bool singleChannel;
|
bool singleChannel;
|
||||||
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
|
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
|
||||||
|
bool seqRecvData;
|
||||||
} SExchangePhysiNode;
|
} SExchangePhysiNode;
|
||||||
|
|
||||||
typedef struct SMergePhysiNode {
|
typedef struct SMergePhysiNode {
|
||||||
|
|
|
@ -299,7 +299,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
|
||||||
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
|
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
|
||||||
qAppendTaskStopInfo(pTaskInfo, &stopInfo);
|
qAppendTaskStopInfo(pTaskInfo, &stopInfo);
|
||||||
|
|
||||||
pInfo->seqLoadData = false;
|
pInfo->seqLoadData = true;
|
||||||
pInfo->pTransporter = pTransporter;
|
pInfo->pTransporter = pTransporter;
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "ExchangeOperator", QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
|
|
|
@ -434,6 +434,7 @@ static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicN
|
||||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||||
COPY_SCALAR_FIELD(srcStartGroupId);
|
COPY_SCALAR_FIELD(srcStartGroupId);
|
||||||
COPY_SCALAR_FIELD(srcEndGroupId);
|
COPY_SCALAR_FIELD(srcEndGroupId);
|
||||||
|
COPY_SCALAR_FIELD(seqRecvData);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1864,6 +1864,7 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
|
||||||
static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId";
|
static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId";
|
||||||
static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId";
|
static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId";
|
||||||
static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints";
|
static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints";
|
||||||
|
static const char* jkExchangePhysiPlanSeqRecvData = "SeqRecvData";
|
||||||
|
|
||||||
static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SExchangePhysiNode* pNode = (const SExchangePhysiNode*)pObj;
|
const SExchangePhysiNode* pNode = (const SExchangePhysiNode*)pObj;
|
||||||
|
@ -1878,6 +1879,9 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints);
|
code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkExchangePhysiPlanSeqRecvData, pNode->seqRecvData);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1895,6 +1899,9 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints);
|
code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkExchangePhysiPlanSeqRecvData, &pNode->seqRecvData);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2428,7 +2428,8 @@ enum {
|
||||||
PHY_EXCHANGE_CODE_SRC_START_GROUP_ID,
|
PHY_EXCHANGE_CODE_SRC_START_GROUP_ID,
|
||||||
PHY_EXCHANGE_CODE_SRC_END_GROUP_ID,
|
PHY_EXCHANGE_CODE_SRC_END_GROUP_ID,
|
||||||
PHY_EXCHANGE_CODE_SINGLE_CHANNEL,
|
PHY_EXCHANGE_CODE_SINGLE_CHANNEL,
|
||||||
PHY_EXCHANGE_CODE_SRC_ENDPOINTS
|
PHY_EXCHANGE_CODE_SRC_ENDPOINTS,
|
||||||
|
PHY_EXCHANGE_CODE_SEQ_RECV_DATA
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
|
@ -2447,6 +2448,9 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_SRC_ENDPOINTS, nodeListToMsg, pNode->pSrcEndPoints);
|
code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_SRC_ENDPOINTS, nodeListToMsg, pNode->pSrcEndPoints);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SEQ_RECV_DATA, pNode->seqRecvData);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2473,6 +2477,9 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_EXCHANGE_CODE_SRC_ENDPOINTS:
|
case PHY_EXCHANGE_CODE_SRC_ENDPOINTS:
|
||||||
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSrcEndPoints);
|
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pSrcEndPoints);
|
||||||
break;
|
break;
|
||||||
|
case PHY_EXCHANGE_CODE_SEQ_RECV_DATA:
|
||||||
|
code = tlvDecodeBool(pTlv, &pNode->seqRecvData);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3685,9 +3685,19 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns
|
||||||
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);
|
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateInsertTable(STranslateContext* pCxt, SNode* pTable) {
|
||||||
|
int32_t code = translateFrom(pCxt, pTable);
|
||||||
|
if (TSDB_CODE_SUCCESS == code && TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
|
||||||
|
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType) {
|
||||||
|
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
|
||||||
|
"insert data into super table is not supported");
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
|
static int32_t translateInsert(STranslateContext* pCxt, SInsertStmt* pInsert) {
|
||||||
pCxt->pCurrStmt = (SNode*)pInsert;
|
pCxt->pCurrStmt = (SNode*)pInsert;
|
||||||
int32_t code = translateFrom(pCxt, pInsert->pTable);
|
int32_t code = translateInsertTable(pCxt, pInsert->pTable);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = translateInsertCols(pCxt, pInsert);
|
code = translateInsertCols(pCxt, pInsert);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1064,6 +1064,7 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
|
||||||
|
|
||||||
pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
|
pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
|
||||||
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
|
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
|
||||||
|
pExchange->seqRecvData = pExchangeLogicNode->seqRecvData;
|
||||||
*pPhyNode = (SPhysiNode*)pExchange;
|
*pPhyNode = (SPhysiNode*)pExchange;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -733,6 +733,17 @@ static int32_t stbSplSplitWindowForCrossTable(SSplitContext* pCxt, SStableSplitI
|
||||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool stbSplNeedSeqRecvData(SLogicNode* pNode) {
|
||||||
|
if (NULL == pNode) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL != pNode->pLimit || NULL != pNode->pSlimit) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return stbSplNeedSeqRecvData(pNode->pParent);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
if (pCxt->pPlanCxt->streamQuery) {
|
if (pCxt->pPlanCxt->streamQuery) {
|
||||||
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
|
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
|
||||||
|
@ -748,6 +759,7 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn
|
||||||
code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
|
code = replaceLogicNode(pInfo->pSubplan, pInfo->pSplitNode, (SLogicNode*)pExchange);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pExchange->seqRecvData = stbSplNeedSeqRecvData((SLogicNode*)pExchange);
|
||||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||||
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
|
(SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue