enh: support split cache last and other functions
This commit is contained in:
parent
604c701290
commit
49ebb7145e
|
@ -121,6 +121,7 @@ int32_t nodesListMakeAppend(SNodeList** pList, SNode* pNode);
|
|||
int32_t nodesListMakeStrictAppend(SNodeList** pList, SNode* pNode);
|
||||
int32_t nodesListAppendList(SNodeList* pTarget, SNodeList* pSrc);
|
||||
int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc);
|
||||
int32_t nodesListMakeStrictAppendList(SNodeList** pTarget, SNodeList* pSrc);
|
||||
int32_t nodesListPushFront(SNodeList* pList, SNode* pNode);
|
||||
SListCell* nodesListErase(SNodeList* pList, SListCell* pCell);
|
||||
void nodesListInsertList(SNodeList* pTarget, SListCell* pPos, SNodeList* pSrc);
|
||||
|
|
|
@ -145,6 +145,7 @@ typedef struct SAggLogicNode {
|
|||
bool hasGroupKeyOptimized;
|
||||
bool isGroupTb;
|
||||
bool isPartTb; // true if partition keys has tbname
|
||||
bool hasGroup;
|
||||
} SAggLogicNode;
|
||||
|
||||
typedef struct SProjectLogicNode {
|
||||
|
|
|
@ -625,7 +625,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
|
||||
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "Group" : "Aggragate"));
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "GroupAggragate" : "Aggragate"));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
|
@ -1152,24 +1152,26 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
|
||||
// sort method
|
||||
EXPLAIN_ROW_NEW(level + 1, "Sort Method: ");
|
||||
if (MERGE_TYPE_SORT == pMergeNode->type) {
|
||||
// sort method
|
||||
EXPLAIN_ROW_NEW(level + 1, "Sort Method: ");
|
||||
|
||||
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
|
||||
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0);
|
||||
SSortExecInfo *pExecInfo = (SSortExecInfo *)execInfo->verboseInfo;
|
||||
EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort");
|
||||
if (pExecInfo->sortBuffer > 1024 * 1024) {
|
||||
EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0));
|
||||
} else if (pExecInfo->sortBuffer > 1024) {
|
||||
EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0));
|
||||
} else {
|
||||
EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer);
|
||||
int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo);
|
||||
SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, 0);
|
||||
SSortExecInfo *pExecInfo = (SSortExecInfo *)execInfo->verboseInfo;
|
||||
EXPLAIN_ROW_APPEND("%s", pExecInfo->sortMethod == SORT_QSORT_T ? "quicksort" : "merge sort");
|
||||
if (pExecInfo->sortBuffer > 1024 * 1024) {
|
||||
EXPLAIN_ROW_APPEND(" Buffers:%.2f Mb", pExecInfo->sortBuffer / (1024 * 1024.0));
|
||||
} else if (pExecInfo->sortBuffer > 1024) {
|
||||
EXPLAIN_ROW_APPEND(" Buffers:%.2f Kb", pExecInfo->sortBuffer / (1024.0));
|
||||
} else {
|
||||
EXPLAIN_ROW_APPEND(" Buffers:%d b", pExecInfo->sortBuffer);
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_APPEND(" loops:%d", pExecInfo->loops);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
}
|
||||
|
||||
if (verbose) {
|
||||
|
@ -1183,29 +1185,31 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_IGNORE_GROUPID_FORMAT, pMergeNode->ignoreGroupId ? "true" : "false");
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
if (MERGE_TYPE_SORT == pMergeNode->type) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_IGNORE_GROUPID_FORMAT, pMergeNode->ignoreGroupId ? "true" : "false");
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT);
|
||||
if (pMergeNode->groupSort) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, "_group_id asc");
|
||||
if (LIST_LENGTH(pMergeNode->pMergeKeys) > 0) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COMMA_FORMAT);
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT);
|
||||
if (pMergeNode->groupSort) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, "_group_id asc");
|
||||
if (LIST_LENGTH(pMergeNode->pMergeKeys) > 0) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COMMA_FORMAT);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
|
||||
SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, nodesGetNameFromColumnNode(ptn->pExpr));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, EXPLAIN_ORDER_STRING(ptn->order));
|
||||
if (i != LIST_LENGTH(pMergeNode->pMergeKeys) - 1) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COMMA_FORMAT);
|
||||
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
|
||||
SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, nodesGetNameFromColumnNode(ptn->pExpr));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, EXPLAIN_ORDER_STRING(ptn->order));
|
||||
if (i != LIST_LENGTH(pMergeNode->pMergeKeys) - 1) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COMMA_FORMAT);
|
||||
}
|
||||
}
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pMergeNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
|
|
|
@ -38,7 +38,8 @@ typedef struct SNonSortMergeInfo {
|
|||
} SNonSortMergeInfo;
|
||||
|
||||
typedef struct SColsMergeInfo {
|
||||
uint64_t srcBlkIds[2];
|
||||
SNodeList* pTargets;
|
||||
uint64_t srcBlkIds[2];
|
||||
} SColsMergeInfo;
|
||||
|
||||
typedef struct SMultiwayMergeOperatorInfo {
|
||||
|
@ -150,7 +151,7 @@ SSDataBlock* doSortMerge(SOperatorInfo* pOperator) {
|
|||
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
|
||||
SSortHandle* pHandle = pSortMergeInfo->pSortHandle;
|
||||
SSDataBlock* pDataBlock = pInfo->binfo.pRes;
|
||||
SArray* pColMatchInfo = pInfo->matchInfo.pList;
|
||||
SArray* pColMatchInfo = pSortMergeInfo->matchInfo.pList;
|
||||
int32_t capacity = pOperator->resultInfo.capacity;
|
||||
|
||||
qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
|
||||
|
@ -234,6 +235,8 @@ void destroySortMergeOperatorInfo(void* param) {
|
|||
pSortMergeInfo->pInputBlock = blockDataDestroy(pSortMergeInfo->pInputBlock);
|
||||
pSortMergeInfo->pIntermediateBlock = blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
|
||||
|
||||
taosArrayDestroy(pSortMergeInfo->matchInfo.pList);
|
||||
|
||||
tsortDestroySortHandle(pSortMergeInfo->pSortHandle);
|
||||
taosArrayDestroy(pSortMergeInfo->pSortInfo);
|
||||
}
|
||||
|
@ -298,25 +301,46 @@ int32_t openColsMergeOperator(SOperatorInfo* pOperator) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t copyColumnsValue(SNodeList* pNodeList, uint64_t targetBlkId, SSDataBlock* pDst, SSDataBlock* pSrc) {
|
||||
bool isNull = (NULL == pSrc || pSrc->info.rows <= 0);
|
||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
|
||||
if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pExpr)->dataBlockId == targetBlkId) {
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, pNode->slotId);
|
||||
if (isNull) {
|
||||
colDataSetVal(pDstCol, 0, NULL, true);
|
||||
} else {
|
||||
SColumnInfoData* pSrcCol = taosArrayGet(pSrc->pDataBlock, ((SColumnNode*)pNode->pExpr)->slotId);
|
||||
colDataAssign(pDstCol, pSrcCol, 1, &pDst->info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSDataBlock* doColsMerge(SOperatorInfo* pOperator) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||
SSDataBlock* pBlock = NULL;
|
||||
SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
|
||||
|
||||
qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
|
||||
qDebug("start to merge columns, %s", GET_TASKID(pTaskInfo));
|
||||
|
||||
for (int32_t i = 0; i < 2; ++i) {
|
||||
pBlock = getNextBlockFromDownstream(pOperator, i);
|
||||
if (NULL == pBlock) {
|
||||
TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
|
||||
pNonSortMerge->sourceWorkIdx++;
|
||||
idx = NON_SORT_NEXT_SRC(pNonSortMerge, idx);
|
||||
continue;
|
||||
if (pBlock && pBlock->info.rows > 1) {
|
||||
qError("more than 1 row returned from downstream, rows:%" PRId64, pBlock->info.rows);
|
||||
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
}
|
||||
break;
|
||||
|
||||
copyColumnsValue(pColsMerge->pTargets, pColsMerge->srcBlkIds[i], pInfo->binfo.pRes, pBlock);
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
pInfo->binfo.pRes->info.rows = 1;
|
||||
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
void destroyColsMergeOperatorInfo(void* param) {
|
||||
|
@ -390,7 +414,6 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
|
|||
void destroyMultiwayMergeOperatorInfo(void* param) {
|
||||
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
|
||||
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||
|
||||
if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) {
|
||||
(*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo);
|
||||
|
@ -467,6 +490,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
initResultSizeInfo(&pOperator->resultInfo, 1);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pColsMerge->pTargets = pMergePhyNode->pTargets;
|
||||
pColsMerge->srcBlkIds[0] = getOperatorResultBlockId(downStreams[0], 0);
|
||||
pColsMerge->srcBlkIds[1] = getOperatorResultBlockId(downStreams[1], 0);
|
||||
break;
|
||||
|
|
|
@ -419,6 +419,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
|||
COPY_SCALAR_FIELD(groupSort);
|
||||
CLONE_NODE_LIST_FIELD(pTags);
|
||||
CLONE_NODE_FIELD(pSubtable);
|
||||
COPY_SCALAR_FIELD(cacheLastMode);
|
||||
COPY_SCALAR_FIELD(igLastNull);
|
||||
COPY_SCALAR_FIELD(groupOrderScan);
|
||||
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
|
||||
|
@ -443,8 +444,14 @@ static int32_t logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) {
|
|||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
CLONE_NODE_LIST_FIELD(pGroupKeys);
|
||||
CLONE_NODE_LIST_FIELD(pAggFuncs);
|
||||
COPY_SCALAR_FIELD(hasLastRow);
|
||||
COPY_SCALAR_FIELD(hasLast);
|
||||
COPY_SCALAR_FIELD(hasTimeLineFunc);
|
||||
COPY_SCALAR_FIELD(onlyHasKeepOrderFunc);
|
||||
COPY_SCALAR_FIELD(hasGroupKeyOptimized);
|
||||
COPY_SCALAR_FIELD(isGroupTb);
|
||||
COPY_SCALAR_FIELD(isPartTb);
|
||||
COPY_SCALAR_FIELD(hasGroup);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -488,6 +495,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
|
|||
CLONE_NODE_LIST_FIELD(pInputs);
|
||||
COPY_SCALAR_FIELD(numOfChannels);
|
||||
COPY_SCALAR_FIELD(srcGroupId);
|
||||
COPY_SCALAR_FIELD(colsMerge);
|
||||
COPY_SCALAR_FIELD(needSort);
|
||||
COPY_SCALAR_FIELD(groupSort);
|
||||
COPY_SCALAR_FIELD(ignoreGroupId);
|
||||
|
|
|
@ -1571,6 +1571,19 @@ int32_t nodesListStrictAppendList(SNodeList* pTarget, SNodeList* pSrc) {
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
int32_t nodesListMakeStrictAppendList(SNodeList** pTarget, SNodeList* pSrc) {
|
||||
if (NULL == *pTarget) {
|
||||
*pTarget = nodesMakeList();
|
||||
if (NULL == *pTarget) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
return nodesListStrictAppendList(*pTarget, pSrc);
|
||||
}
|
||||
|
||||
|
||||
int32_t nodesListPushFront(SNodeList* pList, SNode* pNode) {
|
||||
if (NULL == pList || NULL == pNode) {
|
||||
return TSDB_CODE_FAILED;
|
||||
|
|
|
@ -747,7 +747,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
|
||||
pAgg->isGroupTb = pAgg->pGroupKeys ? keysHasTbname(pAgg->pGroupKeys) : 0;
|
||||
pAgg->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0;
|
||||
|
||||
pAgg->hasGroup = pAgg->pGroupKeys || pSelect->pPartitionByList;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pAgg;
|
||||
} else {
|
||||
|
|
|
@ -2499,21 +2499,7 @@ static bool lastRowScanOptCheckColNum(int32_t lastColNum, col_id_t lastColId,
|
|||
return true;
|
||||
}
|
||||
|
||||
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)pNode;
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
// Only one of LAST and LASTROW can appear
|
||||
if (pAgg->hasLastRow == pAgg->hasLast || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions ||
|
||||
!hasSuitableCache(pScan->cacheLastMode, pAgg->hasLastRow, pAgg->hasLast) ||
|
||||
IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, bool* hasOtherFunc) {
|
||||
bool hasNonPKSelectFunc = false;
|
||||
SNode* pFunc = NULL;
|
||||
int32_t lastColNum = 0, selectNonPKColNum = 0;
|
||||
|
@ -2559,13 +2545,48 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
|||
return false;
|
||||
}
|
||||
} else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) {
|
||||
return false;
|
||||
*hasOtherFunc = true;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool lastRowScanOptCheckLastCache(SAggLogicNode* pAgg, SScanLogicNode* pScan) {
|
||||
// Only one of LAST and LASTROW can appear
|
||||
if (pAgg->hasLastRow == pAgg->hasLast || (!pAgg->hasLast && !pAgg->hasLastRow) || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions ||
|
||||
!hasSuitableCache(pScan->cacheLastMode, pAgg->hasLastRow, pAgg->hasLast) ||
|
||||
IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)pNode;
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
if (!lastRowScanOptCheckLastCache(pAgg, pScan)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool hasOtherFunc = false;
|
||||
if (!lastRowScanOptCheckFuncList(pNode, &hasOtherFunc)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (hasOtherFunc) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
typedef struct SLastRowScanOptSetColDataTypeCxt {
|
||||
bool doAgg;
|
||||
SNodeList* pLastCols;
|
||||
|
@ -2679,6 +2700,201 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static bool splitCacheLastFuncOptMayBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)pNode;
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
if (!lastRowScanOptCheckLastCache(pAgg, pScan)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool hasOtherFunc = false;
|
||||
if (!lastRowScanOptCheckFuncList(pNode, &hasOtherFunc)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pAgg->hasGroup || !hasOtherFunc) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, SAggLogicNode* pAgg, SNodeList* pFunc, SNodeList* pTargets) {
|
||||
SAggLogicNode* pNew = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
|
||||
if (NULL == pNew) {
|
||||
nodesDestroyList(pFunc);
|
||||
nodesDestroyList(pTargets);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pNew->hasLastRow = false;
|
||||
pNew->hasLast = false;
|
||||
pNew->hasTimeLineFunc = pAgg->hasTimeLineFunc;
|
||||
pNew->hasGroupKeyOptimized = false;
|
||||
pNew->onlyHasKeepOrderFunc = pAgg->onlyHasKeepOrderFunc;
|
||||
pNew->node.groupAction = pAgg->node.groupAction;
|
||||
pNew->node.requireDataOrder = pAgg->node.requireDataOrder;
|
||||
pNew->node.resultDataOrder = pAgg->node.resultDataOrder;
|
||||
pNew->node.pTargets = pTargets;
|
||||
pNew->pAggFuncs = pFunc;
|
||||
pNew->pGroupKeys = nodesCloneList(pAgg->pGroupKeys);
|
||||
pNew->node.pConditions = nodesCloneNode(pAgg->node.pConditions);
|
||||
pNew->isGroupTb = pAgg->isGroupTb;
|
||||
pNew->isPartTb = pAgg->isPartTb;
|
||||
pNew->hasGroup = pAgg->hasGroup;
|
||||
pNew->node.pChildren = nodesCloneList(pAgg->node.pChildren);
|
||||
|
||||
*pNewAgg = pNew;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t splitCacheLastFuncOptModifyAggLogicNode(SAggLogicNode* pAgg) {
|
||||
pAgg->hasTimeLineFunc = false;
|
||||
pAgg->onlyHasKeepOrderFunc = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t splitCacheLastFuncOptCreateMergeLogicNode(SMergeLogicNode** pNew, SAggLogicNode* pAgg1, SAggLogicNode* pAgg2) {
|
||||
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
|
||||
if (NULL == pMerge) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pMerge->colsMerge = true;
|
||||
pMerge->numOfChannels = 2;
|
||||
pMerge->srcGroupId = -1;
|
||||
pMerge->node.precision = pAgg1->node.precision;
|
||||
|
||||
SNode* pNewAgg1 = nodesCloneNode((SNode*)pAgg1);
|
||||
SNode* pNewAgg2 = nodesCloneNode((SNode*)pAgg2);
|
||||
if (NULL == pNewAgg1 || NULL == pNewAgg2) {
|
||||
nodesDestroyNode(pNewAgg1);
|
||||
nodesDestroyNode(pNewAgg2);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
((SAggLogicNode*)pNewAgg1)->node.pParent = (SLogicNode*)pMerge;
|
||||
((SAggLogicNode*)pNewAgg2)->node.pParent = (SLogicNode*)pMerge;
|
||||
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, ((SAggLogicNode*)pNewAgg1)->node.pChildren) {
|
||||
((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg1;
|
||||
}
|
||||
FOREACH(pNode, ((SAggLogicNode*)pNewAgg2)->node.pChildren) {
|
||||
((SLogicNode*)pNode)->pParent = (SLogicNode*)pNewAgg2;
|
||||
}
|
||||
|
||||
int32_t code = nodesListMakeStrictAppendList(&pMerge->node.pTargets, nodesCloneList(pAgg1->node.pTargets));
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppendList(&pMerge->node.pTargets, nodesCloneList(pAgg2->node.pTargets));
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pMerge->node.pChildren, pNewAgg1);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pMerge->node.pChildren, pNewAgg2);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(pNewAgg1);
|
||||
nodesDestroyNode(pNewAgg2);
|
||||
nodesDestroyNode((SNode*)pMerge);
|
||||
} else {
|
||||
*pNew = pMerge;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, splitCacheLastFuncOptMayBeOptimized);
|
||||
|
||||
if (NULL == pAgg) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pNode = NULL;
|
||||
SNodeList* pAggFuncList = NULL;
|
||||
{
|
||||
WHERE_EACH(pNode, pAgg->pAggFuncs) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pNode;
|
||||
int32_t funcType = pFunc->funcType;
|
||||
if (FUNCTION_TYPE_LAST_ROW != funcType && FUNCTION_TYPE_LAST != funcType &&
|
||||
FUNCTION_TYPE_SELECT_VALUE != funcType && FUNCTION_TYPE_GROUP_KEY != funcType) {
|
||||
nodesListMakeStrictAppend(&pAggFuncList, nodesCloneNode(pNode));
|
||||
ERASE_NODE(pAgg->pAggFuncs);
|
||||
continue;
|
||||
}
|
||||
WHERE_NEXT;
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pAggFuncList) {
|
||||
planError("empty agg func list while splite projections");
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
SNodeList* pTargets = NULL;
|
||||
{
|
||||
WHERE_EACH(pNode, pAgg->node.pTargets) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
SNode* pFuncNode = NULL;
|
||||
bool found = false;
|
||||
FOREACH(pFuncNode, pAggFuncList) {
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pFuncNode;
|
||||
if (0 == strcmp(pFunc->node.aliasName, pCol->colName)) {
|
||||
nodesListMakeStrictAppend(&pTargets, nodesCloneNode(pNode));
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (found) {
|
||||
ERASE_NODE(pAgg->node.pTargets);
|
||||
continue;
|
||||
}
|
||||
WHERE_NEXT;
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pTargets) {
|
||||
planError("empty target func list while splite projections");
|
||||
nodesDestroyList(pAggFuncList);
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
SMergeLogicNode* pMerge = NULL;
|
||||
SAggLogicNode* pNewAgg = NULL;
|
||||
int32_t code = splitCacheLastFuncOptCreateAggLogicNode(&pNewAgg, pAgg, pAggFuncList, pTargets);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = splitCacheLastFuncOptModifyAggLogicNode(pAgg);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = splitCacheLastFuncOptCreateMergeLogicNode(&pMerge, pNewAgg, pAgg);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pAgg, (SLogicNode*)pMerge);
|
||||
}
|
||||
|
||||
nodesDestroyNode((SNode *)pAgg);
|
||||
nodesDestroyNode((SNode *)pNewAgg);
|
||||
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode((SNode *)pMerge);
|
||||
}
|
||||
|
||||
pCxt->optimized = true;
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// merge projects
|
||||
static bool mergeProjectsMayBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren)) {
|
||||
|
@ -3762,6 +3978,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
|||
{.pName = "MergeProjects", .optimizeFunc = mergeProjectsOptimize},
|
||||
{.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize},
|
||||
{.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize},
|
||||
{.pName = "splitCacheLastFunc", .optimizeFunc = splitCacheLastFuncOptimize},
|
||||
{.pName = "LastRowScan", .optimizeFunc = lastRowScanOptimize},
|
||||
{.pName = "TagScan", .optimizeFunc = tagScanOptimize},
|
||||
{.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize},
|
||||
|
|
|
@ -2001,6 +2001,9 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildre
|
|||
SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
|
||||
|
||||
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pMergeLogicNode->node.pTargets, &pMerge->pTargets);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -1588,9 +1588,12 @@ typedef struct SSmaIndexSplitInfo {
|
|||
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
|
||||
SSmaIndexSplitInfo* pInfo) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
|
||||
pInfo->pMerge = (SMergeLogicNode*)pNode;
|
||||
pInfo->pSubplan = pSubplan;
|
||||
return true;
|
||||
int32_t nodeType = nodeType(nodesListGetNode(pNode->pChildren, 0));
|
||||
if (nodeType == QUERY_NODE_LOGIC_PLAN_EXCHANGE || nodeType == QUERY_NODE_LOGIC_PLAN_MERGE) {
|
||||
pInfo->pMerge = (SMergeLogicNode*)pNode;
|
||||
pInfo->pSubplan = pSubplan;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue