revise plan for tsma optimization
This commit is contained in:
parent
895a4584e6
commit
c94a262f22
|
@ -6139,8 +6139,9 @@ SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNod
|
||||||
return pScanCols;
|
return pScanCols;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsmaOptRewriteTags(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma, SColumnNode* pTagCol) {
|
static int32_t tsmaOptRewriteTag(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma, SColumnNode* pTagCol) {
|
||||||
bool found = false;
|
bool found = false;
|
||||||
|
if (pTagCol->colType != COLUMN_TYPE_TAG) return 0;
|
||||||
for (int32_t i = 0; i < pTsma->pTsma->pTags->size; ++i) {
|
for (int32_t i = 0; i < pTsma->pTsma->pTags->size; ++i) {
|
||||||
const SSchema* pSchema = taosArrayGet(pTsma->pTsma->pTags, i);
|
const SSchema* pSchema = taosArrayGet(pTsma->pTsma->pTags, i);
|
||||||
if (strcmp(pTagCol->colName, pSchema->name) == 0) {
|
if (strcmp(pTagCol->colName, pSchema->name) == 0) {
|
||||||
|
@ -6157,6 +6158,12 @@ static int32_t tsmaOptRewriteTags(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUseful
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsmaOptRewriteTbname(STSMAOptCtx* pTsmaOptCtx, SFunctionNode* pTbNameNode) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNewScan, const STSMAOptUsefulTsma* pTsma) {
|
static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNewScan, const STSMAOptUsefulTsma* pTsma) {
|
||||||
SNode* pNode;
|
SNode* pNode;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -6202,12 +6209,18 @@ static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pNew
|
||||||
// pseudo columns
|
// pseudo columns
|
||||||
FOREACH(pNode, pNewScan->pScanPseudoCols) {
|
FOREACH(pNode, pNewScan->pScanPseudoCols) {
|
||||||
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
||||||
tsmaOptRewriteTags(pTsmaOptCtx, pTsma, (SColumnNode*)pNode);
|
code = tsmaOptRewriteTag(pTsmaOptCtx, pTsma, (SColumnNode*)pNode);
|
||||||
|
} else if (nodeType(pNode) == QUERY_NODE_FUNCTION) {
|
||||||
|
code = tsmaOptRewriteTbname(pTsmaOptCtx, (SFunctionNode*)pNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
FOREACH(pNode, pNewScan->pGroupTags) {
|
FOREACH(pNode, pNewScan->pGroupTags) {
|
||||||
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
||||||
tsmaOptRewriteTags(pTsmaOptCtx, pTsma, (SColumnNode*)pNode);
|
code = tsmaOptRewriteTag(pTsmaOptCtx, pTsma, (SColumnNode*)pNode);
|
||||||
|
} else if (nodeType(pNode) == QUERY_NODE_FUNCTION) {
|
||||||
|
code = tsmaOptRewriteTbname(pTsmaOptCtx, (SFunctionNode*)pNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6237,7 +6250,8 @@ static int32_t tsmaOptCreateWStart(int8_t precision, SFunctionNode** pWStartOut)
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan) {
|
static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent, SScanLogicNode* pScan,
|
||||||
|
const STSMAOptUsefulTsma* pTsma) {
|
||||||
SNode * pStateFuncNode, *pAggFuncNode;
|
SNode * pStateFuncNode, *pAggFuncNode;
|
||||||
SColumnNode* pColNode;
|
SColumnNode* pColNode;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -6265,7 +6279,13 @@ static int32_t tsmaOptRevisePlan(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParent,
|
||||||
FORBOTH(pStateFuncNode, pAggStateFuncs, pAggFuncNode, pAggFuncs) {
|
FORBOTH(pStateFuncNode, pAggStateFuncs, pAggFuncNode, pAggFuncs) {
|
||||||
SFunctionNode* pStateFunc = (SFunctionNode*)pStateFuncNode;
|
SFunctionNode* pStateFunc = (SFunctionNode*)pStateFuncNode;
|
||||||
SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode;
|
SFunctionNode* pAggFunc = (SFunctionNode*)pAggFuncNode;
|
||||||
if (fmIsGroupKeyFunc(pStateFunc->funcId)) {
|
if (fmIsGroupKeyFunc(pAggFunc->funcId)) {
|
||||||
|
ASSERT(pAggFunc->pParameterList->length == 1);
|
||||||
|
SNode* pParamNode = pAggFunc->pParameterList->pHead->pNode;
|
||||||
|
if (nodeType(pParamNode) == QUERY_NODE_COLUMN) {
|
||||||
|
SColumnNode* pTagCol = (SColumnNode*)pParamNode;
|
||||||
|
if (pTagCol->colType == COLUMN_TYPE_TAG) tsmaOptRewriteTag(pTsmaOptCtx, pTsma, pTagCol);
|
||||||
|
}
|
||||||
continue;
|
continue;
|
||||||
} else if (fmIsPseudoColumnFunc(pStateFunc->funcId)) {
|
} else if (fmIsPseudoColumnFunc(pStateFunc->funcId)) {
|
||||||
if (pStateFunc->funcType == FUNCTION_TYPE_WSTART) hasWStart = true;
|
if (pStateFunc->funcType == FUNCTION_TYPE_WSTART) hasWStart = true;
|
||||||
|
@ -6333,10 +6353,11 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
}
|
}
|
||||||
pSubplan->pNode = pParent;
|
pSubplan->pNode = pParent;
|
||||||
pParent->pParent = NULL;
|
pParent->pParent = NULL;
|
||||||
|
pParent->groupAction = GROUP_ACTION_KEEP;
|
||||||
SScanLogicNode* pScan = (SScanLogicNode*)pParent->pChildren->pHead->pNode;
|
SScanLogicNode* pScan = (SScanLogicNode*)pParent->pChildren->pHead->pNode;
|
||||||
code = tsmaOptRewriteScan(pTsmaOptCtx, pScan, pTsma);
|
code = tsmaOptRewriteScan(pTsmaOptCtx, pScan, pTsma);
|
||||||
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
|
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
|
||||||
code = tsmaOptRevisePlan(pTsmaOptCtx, pParent, pScan);
|
code = tsmaOptRevisePlan(pTsmaOptCtx, pParent, pScan, pTsma);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6344,8 +6365,7 @@ static int32_t tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) {
|
||||||
pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, 0);
|
pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, 0);
|
||||||
code = tsmaOptRewriteScan(pTsmaOptCtx, pTsmaOptCtx->pScan, pTsma);
|
code = tsmaOptRewriteScan(pTsmaOptCtx, pTsmaOptCtx->pScan, pTsma);
|
||||||
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
|
if (code == TSDB_CODE_SUCCESS && pTsma->pTsma) {
|
||||||
code = tsmaOptRevisePlan(pTsmaOptCtx, pTsmaOptCtx->pParent, pTsmaOptCtx->pScan);
|
code = tsmaOptRevisePlan(pTsmaOptCtx, pTsmaOptCtx->pParent, pTsmaOptCtx->pScan, pTsma); }
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -644,6 +644,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
|
||||||
pMerge->node.precision = pPartChild->precision;
|
pMerge->node.precision = pPartChild->precision;
|
||||||
pMerge->pMergeKeys = pMergeKeys;
|
pMerge->pMergeKeys = pMergeKeys;
|
||||||
pMerge->groupSort = groupSort;
|
pMerge->groupSort = groupSort;
|
||||||
|
pMerge->numOfSubplans = 1;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
|
pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
|
||||||
|
@ -727,15 +728,13 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
SNode* pNode;
|
SNode* pNode;
|
||||||
|
SMergeLogicNode* pMerge = (SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode;
|
||||||
FOREACH(pNode, pInfo->pSubplan->pChildren) {
|
FOREACH(pNode, pInfo->pSubplan->pChildren) {
|
||||||
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
|
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
|
||||||
SMergeLogicNode* pMerge = (SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode;
|
|
||||||
//pMerge->numOfChannels += stbSplGetNumOfVgroups(pSubplan->pNode);
|
|
||||||
pSubplan->id.groupId = pCxt->groupId;
|
pSubplan->id.groupId = pCxt->groupId;
|
||||||
pSubplan->id.queryId = pCxt->queryId;
|
pSubplan->id.queryId = pCxt->queryId;
|
||||||
pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
|
pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
|
||||||
TSWAP(((SScanLogicNode*)pSubplan->pNode->pChildren->pHead->pNode)->pVgroupList, pSubplan->pVgroupList);
|
splSetSubplanVgroups(pSubplan, pSubplan->pNode);
|
||||||
//++(pCxt->groupId);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -743,7 +742,6 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
|
||||||
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
|
(SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT));
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
((SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode)->srcEndGroupId = pCxt->groupId;
|
|
||||||
((SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode)->numOfSubplans = pInfo->pSubplan->pChildren->length;
|
((SMergeLogicNode*)pInfo->pSplitNode->pChildren->pHead->pNode)->numOfSubplans = pInfo->pSubplan->pChildren->length;
|
||||||
}
|
}
|
||||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||||
|
@ -959,7 +957,8 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode)) {
|
if (isPartTableWinodw((SWindowLogicNode*)pInfo->pSplitNode) &&
|
||||||
|
(pInfo->pSubplan->pChildren && LIST_LENGTH(pInfo->pSubplan->pChildren) > 0)) {
|
||||||
return stbSplSplitWindowForPartTable(pCxt, pInfo);
|
return stbSplSplitWindowForPartTable(pCxt, pInfo);
|
||||||
} else {
|
} else {
|
||||||
return stbSplSplitWindowForCrossTable(pCxt, pInfo);
|
return stbSplSplitWindowForCrossTable(pCxt, pInfo);
|
||||||
|
|
Loading…
Reference in New Issue