fix: add pk for merge generated during merge scan node split
This commit is contained in:
parent
52a04cd008
commit
f778dce52d
|
@ -690,19 +690,23 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplCreateMergeKeysByExpr(SNode* pExpr, EOrder order, SNodeList** pMergeKeys) {
|
||||
SOrderByExprNode* pOrderByExpr = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||
if (NULL == pOrderByExpr) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pOrderByExpr->pExpr = nodesCloneNode(pExpr);
|
||||
if (NULL == pOrderByExpr->pExpr) {
|
||||
nodesDestroyNode((SNode*)pOrderByExpr);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pOrderByExpr->order = order;
|
||||
pOrderByExpr->nullOrder = (order == ORDER_ASC) ? NULL_ORDER_FIRST : NULL_ORDER_LAST;
|
||||
return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pOrderByExpr);
|
||||
}
|
||||
|
||||
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) {
|
||||
SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||
if (NULL == pMergeKey) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pMergeKey->pExpr = nodesCloneNode(pPrimaryKey);
|
||||
if (NULL == pMergeKey->pExpr) {
|
||||
nodesDestroyNode((SNode*)pMergeKey);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pMergeKey->order = order;
|
||||
pMergeKey->nullOrder = NULL_ORDER_FIRST;
|
||||
return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey);
|
||||
return stbSplCreateMergeKeysByExpr(pPrimaryKey, order, pMergeKeys);
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
|
@ -1357,6 +1361,28 @@ static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
|
|||
return pCol;
|
||||
}
|
||||
|
||||
static SNode* stbSplFindPkFromScan(SScanLogicNode* pScan) {
|
||||
bool find = false;
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pScan->pScanCols) {
|
||||
if (((SColumnNode*)pCol)->isPk) {
|
||||
find = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!find) {
|
||||
return NULL;
|
||||
}
|
||||
SNode* pTarget = NULL;
|
||||
FOREACH(pTarget, pScan->node.pTargets) {
|
||||
if (nodesEqualNode(pTarget, pCol)) {
|
||||
return pCol;
|
||||
}
|
||||
}
|
||||
nodesListStrictAppend(pScan->node.pTargets, nodesCloneNode(pCol));
|
||||
return pCol;
|
||||
}
|
||||
|
||||
static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan,
|
||||
SNodeList** pOutputMergeKeys) {
|
||||
SNodeList* pChildren = pScan->node.pChildren;
|
||||
|
@ -1374,8 +1400,13 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu
|
|||
pMergeScan->filesetDelimited = true;
|
||||
pMergeScan->node.pChildren = pChildren;
|
||||
splSetParent((SLogicNode*)pMergeScan);
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan),
|
||||
pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
|
||||
|
||||
SNode* pTs = stbSplFindPrimaryKeyFromScan(pMergeScan);
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(pTs, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
|
||||
SNode* pPk = stbSplFindPkFromScan(pMergeScan);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pPk) {
|
||||
code = stbSplCreateMergeKeysByExpr(pPk, pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
Loading…
Reference in New Issue