Merge branch 'feature/3.0_debug_wxy' of github.com:taosdata/TDengine into feature/3.0_debug_wxy
This commit is contained in:
commit
c55bef32ff
|
@ -76,8 +76,8 @@ typedef struct SScanLogicNode {
|
|||
int16_t tsColId;
|
||||
double filesFactor;
|
||||
SArray* pSmaIndexes;
|
||||
SNodeList* pPartTags;
|
||||
bool partSort;
|
||||
SNodeList* pGroupTags;
|
||||
bool groupSort;
|
||||
} SScanLogicNode;
|
||||
|
||||
typedef struct SJoinLogicNode {
|
||||
|
@ -141,6 +141,7 @@ typedef struct SMergeLogicNode {
|
|||
SNodeList* pInputs;
|
||||
int32_t numOfChannels;
|
||||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
} SMergeLogicNode;
|
||||
|
||||
typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType;
|
||||
|
@ -284,7 +285,8 @@ typedef struct STableScanPhysiNode {
|
|||
double ratio;
|
||||
int32_t dataRequired;
|
||||
SNodeList* pDynamicScanFuncs;
|
||||
SNodeList* pPartitionTags;
|
||||
SNodeList* pGroupTags;
|
||||
bool groupSort;
|
||||
int64_t interval;
|
||||
int64_t offset;
|
||||
int64_t sliding;
|
||||
|
@ -358,6 +360,7 @@ typedef struct SMergePhysiNode {
|
|||
SNodeList* pTargets;
|
||||
int32_t numOfChannels;
|
||||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
} SMergePhysiNode;
|
||||
|
||||
typedef struct SWinodwPhysiNode {
|
||||
|
|
|
@ -2051,7 +2051,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
|
|||
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pPartitionTags);
|
||||
code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -2455,7 +2455,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
if (pTableScanNode->pPartitionTags) {
|
||||
if (pTableScanNode->pGroupTags) {
|
||||
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
|
||||
}
|
||||
|
||||
|
|
|
@ -351,7 +351,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
|||
COPY_SCALAR_FIELD(watermark);
|
||||
COPY_SCALAR_FIELD(tsColId);
|
||||
COPY_SCALAR_FIELD(filesFactor);
|
||||
CLONE_NODE_LIST_FIELD(pPartTags);
|
||||
CLONE_NODE_LIST_FIELD(pGroupTags);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -401,6 +401,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
|
|||
CLONE_NODE_LIST_FIELD(pInputs);
|
||||
COPY_SCALAR_FIELD(numOfChannels);
|
||||
COPY_SCALAR_FIELD(srcGroupId);
|
||||
COPY_SCALAR_FIELD(groupSort);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -501,7 +502,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
|
|||
COPY_SCALAR_FIELD(ratio);
|
||||
COPY_SCALAR_FIELD(dataRequired);
|
||||
CLONE_NODE_LIST_FIELD(pDynamicScanFuncs);
|
||||
CLONE_NODE_LIST_FIELD(pPartitionTags);
|
||||
CLONE_NODE_LIST_FIELD(pGroupTags);
|
||||
COPY_SCALAR_FIELD(interval);
|
||||
COPY_SCALAR_FIELD(offset);
|
||||
COPY_SCALAR_FIELD(sliding);
|
||||
|
|
|
@ -541,7 +541,7 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
|
|||
static const char* jkScanLogicPlanTableId = "TableId";
|
||||
static const char* jkScanLogicPlanTableType = "TableType";
|
||||
static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||
static const char* jkScanLogicPlanPartTags = "PartTags";
|
||||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||
|
||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||
|
@ -563,7 +563,7 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkScanLogicPlanPartTags, pNode->pPartTags);
|
||||
code = nodeListToJson(pJson, jkScanLogicPlanGroupTags, pNode->pGroupTags);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -590,7 +590,7 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
|||
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkScanLogicPlanPartTags, &pNode->pPartTags);
|
||||
code = jsonToNodeList(pJson, jkScanLogicPlanGroupTags, &pNode->pGroupTags);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -1432,7 +1432,8 @@ static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
|
|||
static const char* jkTableScanPhysiPlanWatermark = "watermark";
|
||||
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
|
||||
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
|
||||
static const char* jkTableScanPhysiPlanPartitionTags = "PartitionTags";
|
||||
static const char* jkTableScanPhysiPlanGroupTags = "GroupTags";
|
||||
static const char* jkTableScanPhysiPlanGroupSort = "GroupSort";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
@ -1487,7 +1488,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkTableScanPhysiPlanPartitionTags, pNode->pPartitionTags);
|
||||
code = nodeListToJson(pJson, jkTableScanPhysiPlanGroupTags, pNode->pGroupTags);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanGroupSort, pNode->groupSort);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -1546,7 +1550,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkTableScanPhysiPlanPartitionTags, &pNode->pPartitionTags);
|
||||
code = jsonToNodeList(pJson, jkTableScanPhysiPlanGroupTags, &pNode->pGroupTags);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanGroupSort, &pNode->groupSort);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -1727,6 +1734,7 @@ static const char* jkMergePhysiPlanMergeKeys = "MergeKeys";
|
|||
static const char* jkMergePhysiPlanTargets = "Targets";
|
||||
static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
|
||||
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
|
||||
static const char* jkMergePhysiPlanGroupSort = "GroupSort";
|
||||
|
||||
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
|
||||
|
@ -1744,6 +1752,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanSrcGroupId, pNode->srcGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanGroupSort, pNode->groupSort);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1764,6 +1775,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkMergePhysiPlanSrcGroupId, &pNode->srcGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkMergePhysiPlanGroupSort, &pNode->groupSort);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -711,7 +711,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
nodesDestroyNode(pLogicNode->pTagCond);
|
||||
nodesDestroyNode(pLogicNode->pTagIndexCond);
|
||||
taosArrayDestroy(pLogicNode->pSmaIndexes);
|
||||
nodesDestroyList(pLogicNode->pPartTags);
|
||||
nodesDestroyList(pLogicNode->pGroupTags);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_JOIN: {
|
||||
|
@ -815,7 +815,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
STableScanPhysiNode* pPhyNode = (STableScanPhysiNode*)pNode;
|
||||
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
||||
nodesDestroyList(pPhyNode->pDynamicScanFuncs);
|
||||
nodesDestroyList(pPhyNode->pPartitionTags);
|
||||
nodesDestroyList(pPhyNode->pGroupTags);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT: {
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
|
||||
#define OPTIMIZE_FLAG_MASK(n) (1 << n)
|
||||
|
||||
#define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0)
|
||||
#define OPTIMIZE_FLAG_SCAN_PATH OPTIMIZE_FLAG_MASK(0)
|
||||
#define OPTIMIZE_FLAG_PUSH_DOWN_CONDE OPTIMIZE_FLAG_MASK(1)
|
||||
|
||||
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
|
||||
|
@ -91,7 +91,7 @@ static bool scanPathOptHaveNormalCol(SNodeList* pList) {
|
|||
}
|
||||
|
||||
static bool scanPathOptMayBeOptimized(SLogicNode* pNode) {
|
||||
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) {
|
||||
if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH)) {
|
||||
return false;
|
||||
}
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) {
|
||||
|
@ -241,7 +241,7 @@ static int32_t scanPathOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
if (TSDB_CODE_SUCCESS == code && (NULL != info.pDsoFuncs || NULL != info.pSdrFuncs)) {
|
||||
info.pScan->dataRequired = scanPathOptGetDataRequired(info.pSdrFuncs);
|
||||
info.pScan->pDynamicScanFuncs = info.pDsoFuncs;
|
||||
OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_OSD);
|
||||
OPTIMIZE_FLAG_SET_MASK(info.pScan->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH);
|
||||
pCxt->optimized = true;
|
||||
}
|
||||
nodesDestroyList(info.pSdrFuncs);
|
||||
|
@ -1073,7 +1073,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pPartTags);
|
||||
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags);
|
||||
int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
NODES_CLEAR_LIST(pNode->pChildren);
|
||||
|
@ -1083,7 +1083,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
SNode* pGroupKey = NULL;
|
||||
FOREACH(pGroupKey, ((SAggLogicNode*)pNode)->pGroupKeys) {
|
||||
code = nodesListMakeStrictAppend(
|
||||
&pScan->pPartTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0)));
|
||||
&pScan->pGroupTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0)));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
|
@ -1091,7 +1091,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
NODES_DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = partTagsOptRebuildTbanme(pScan->pPartTags);
|
||||
code = partTagsOptRebuildTbanme(pScan->pGroupTags);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1212,7 +1212,7 @@ static bool rewriteTailOptNeedGroupSort(SIndefRowsFuncLogicNode* pIndef) {
|
|||
}
|
||||
SNode* pChild = nodesListGetNode(pIndef->node.pChildren, 0);
|
||||
return QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild) ||
|
||||
(QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && NULL != ((SScanLogicNode*)pChild)->pPartTags);
|
||||
(QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && NULL != ((SScanLogicNode*)pChild)->pGroupTags);
|
||||
}
|
||||
|
||||
static int32_t rewriteTailOptCreateSort(SIndefRowsFuncLogicNode* pIndef, SLogicNode** pOutput) {
|
||||
|
|
|
@ -520,12 +520,13 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
|||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||
pTableScan->dataRequired = pScanLogicNode->dataRequired;
|
||||
pTableScan->pDynamicScanFuncs = nodesCloneList(pScanLogicNode->pDynamicScanFuncs);
|
||||
pTableScan->pPartitionTags = nodesCloneList(pScanLogicNode->pPartTags);
|
||||
pTableScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
|
||||
if ((NULL != pScanLogicNode->pDynamicScanFuncs && NULL == pTableScan->pDynamicScanFuncs) ||
|
||||
(NULL != pScanLogicNode->pPartTags && NULL == pTableScan->pPartitionTags)) {
|
||||
(NULL != pScanLogicNode->pGroupTags && NULL == pTableScan->pGroupTags)) {
|
||||
nodesDestroyNode((SNode*)pTableScan);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pTableScan->groupSort = pScanLogicNode->groupSort;
|
||||
pTableScan->interval = pScanLogicNode->interval;
|
||||
pTableScan->offset = pScanLogicNode->offset;
|
||||
pTableScan->sliding = pScanLogicNode->sliding;
|
||||
|
@ -1330,6 +1331,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
|
|||
|
||||
pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
|
||||
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
|
||||
pMerge->groupSort = pMergeLogicNode->groupSort;
|
||||
|
||||
int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
|
||||
|
||||
|
|
|
@ -362,7 +362,7 @@ static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
|
|||
}
|
||||
|
||||
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
|
||||
SNodeList* pMergeKeys, SLogicNode* pPartChild) {
|
||||
SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) {
|
||||
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
|
||||
if (NULL == pMerge) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -371,6 +371,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
|
|||
pMerge->srcGroupId = pCxt->groupId;
|
||||
pMerge->node.precision = pPartChild->precision;
|
||||
pMerge->pMergeKeys = pMergeKeys;
|
||||
pMerge->groupSort = groupSort;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
|
||||
|
@ -430,7 +431,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
|
|||
SNodeList* pMergeKeys = NULL;
|
||||
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow);
|
||||
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, false);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyList(pMergeKeys);
|
||||
|
@ -497,12 +498,16 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
|
|||
return code;
|
||||
}
|
||||
|
||||
static void splSetTableScanType(SLogicNode* pNode, EScanType scanType) {
|
||||
static void stbSplSetTableMergeScan(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
((SScanLogicNode*)pNode)->scanType = scanType;
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
|
||||
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
|
||||
if (NULL != pScan->pGroupTags) {
|
||||
pScan->groupSort = true;
|
||||
}
|
||||
} else {
|
||||
if (1 == LIST_LENGTH(pNode->pChildren)) {
|
||||
splSetTableScanType((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), scanType);
|
||||
stbSplSetTableMergeScan((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -515,7 +520,7 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
|
|||
int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild);
|
||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -524,13 +529,10 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
|
|||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
splSetTableScanType(pChild, SCAN_TYPE_TABLE_MERGE);
|
||||
++(pCxt->groupId);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
stbSplSetTableMergeScan(pChild);
|
||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
|
||||
++(pCxt->groupId);
|
||||
} else {
|
||||
nodesDestroyList(pMergeKeys);
|
||||
}
|
||||
|
@ -560,7 +562,7 @@ static int32_t stbSplSplitState(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
|||
|
||||
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
return ((SScanLogicNode*)pNode)->pPartTags;
|
||||
return ((SScanLogicNode*)pNode)->pGroupTags;
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
|
||||
} else {
|
||||
|
@ -790,12 +792,29 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
|
|||
return code;
|
||||
}
|
||||
|
||||
static void stbSplSetScanPartSort(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
|
||||
if (NULL != pScan->pGroupTags) {
|
||||
pScan->groupSort = true;
|
||||
}
|
||||
} else {
|
||||
if (1 == LIST_LENGTH(pNode->pChildren)) {
|
||||
stbSplSetScanPartSort((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pPartSort = NULL;
|
||||
SNodeList* pMergeKeys = NULL;
|
||||
bool groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
|
||||
int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort);
|
||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && groupSort) {
|
||||
stbSplSetScanPartSort(pPartSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
|
@ -830,7 +849,7 @@ static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pS
|
|||
SNodeList* pMergeKeys = NULL;
|
||||
int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan);
|
||||
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, false);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pSubplan->pChildren,
|
||||
|
|
Loading…
Reference in New Issue