optimize sort perf
This commit is contained in:
parent
4a131000e4
commit
cb78c54d72
|
@ -83,6 +83,15 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool colDataIsNull_t(const SColumnInfoData* pColumnInfoData, uint32_t row, bool isVarType) {
|
||||
if (!pColumnInfoData->hasNull) return false;
|
||||
if (isVarType) {
|
||||
return colDataIsNull_var(pColumnInfoData, row);
|
||||
} else {
|
||||
return pColumnInfoData->nullbitmap ? colDataIsNull_f(pColumnInfoData->nullbitmap, row) : false;
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
|
||||
SColumnDataAgg* pColAgg) {
|
||||
if (!pColumnInfoData->hasNull) {
|
||||
|
|
|
@ -223,6 +223,7 @@ typedef struct SMergeLogicNode {
|
|||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
bool inputWithGroupId;
|
||||
} SMergeLogicNode;
|
||||
|
||||
typedef enum EWindowType {
|
||||
|
@ -294,7 +295,8 @@ typedef struct SPartitionLogicNode {
|
|||
SNode* pSubtable;
|
||||
|
||||
bool needBlockOutputTsOrder; // if true, partition output block will have ts order maintained
|
||||
int32_t tsSlotId;
|
||||
int32_t pkTsColId;
|
||||
uint64_t pkTsColTbId;
|
||||
} SPartitionLogicNode;
|
||||
|
||||
typedef enum ESubplanType {
|
||||
|
@ -534,6 +536,7 @@ typedef struct SMergePhysiNode {
|
|||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
bool inputWithGroupId;
|
||||
} SMergePhysiNode;
|
||||
|
||||
typedef struct SWindowPhysiNode {
|
||||
|
|
|
@ -2265,11 +2265,11 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
|
|||
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
const char* isNull = oldkeyBuf;
|
||||
const char* p = oldkeyBuf + sizeof(int8_t) * taosArrayGetSize(pSortGroupCols);
|
||||
const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pSortGroupCols); ++i) {
|
||||
const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i);
|
||||
const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
||||
for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
|
||||
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
|
||||
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
|
||||
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
|
||||
|
||||
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
|
||||
|
@ -2296,15 +2296,15 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol
|
|||
|
||||
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock,
|
||||
int32_t rowIndex) {
|
||||
uint32_t colNum = taosArrayGetSize(pSortGroupCols);
|
||||
uint32_t colNum = pSortGroupCols->size;
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
char* isNull = keyBuf;
|
||||
char* p = keyBuf + sizeof(int8_t) * colNum;
|
||||
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i);
|
||||
const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
||||
if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) continue;
|
||||
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
|
||||
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
|
||||
if (pCol->slotId > pBlock->pDataBlock->size) continue;
|
||||
|
||||
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
|
||||
|
||||
|
|
|
@ -692,6 +692,7 @@ typedef struct SMultiwayMergeOperatorInfo {
|
|||
bool ignoreGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle* prefetchedTuple;
|
||||
bool inputWithGroupId;
|
||||
} SMultiwayMergeOperatorInfo;
|
||||
|
||||
int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
||||
|
@ -742,7 +743,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
|||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->groupSort) {
|
||||
if (pInfo->groupSort || pInfo->inputWithGroupId) {
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
|
@ -763,7 +764,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
|||
break;
|
||||
}
|
||||
|
||||
if (pInfo->groupSort) {
|
||||
if (pInfo->groupSort || pInfo->inputWithGroupId) {
|
||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||
if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
|
@ -943,6 +944,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
|
||||
pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder;
|
||||
pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder;
|
||||
pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId;
|
||||
|
||||
setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
|
||||
|
|
|
@ -616,48 +616,62 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
|
|||
int ret = pParam->cmpFn(left1, right1);
|
||||
return ret;
|
||||
} else {
|
||||
bool isVarType;
|
||||
for (int32_t i = 0; i < pInfo->size; ++i) {
|
||||
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
|
||||
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
|
||||
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
|
||||
isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);
|
||||
|
||||
bool leftNull = false;
|
||||
if (pLeftColInfoData->hasNull) {
|
||||
if (pLeftBlock->pBlockAgg == NULL) {
|
||||
leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||
} else {
|
||||
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
|
||||
pLeftBlock->pBlockAgg[i]);
|
||||
if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
|
||||
bool leftNull = false;
|
||||
if (pLeftColInfoData->hasNull) {
|
||||
if (pLeftBlock->pBlockAgg == NULL) {
|
||||
leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType);
|
||||
} else {
|
||||
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
|
||||
pLeftBlock->pBlockAgg[i]);
|
||||
}
|
||||
}
|
||||
|
||||
bool rightNull = false;
|
||||
if (pRightColInfoData->hasNull) {
|
||||
if (pRightBlock->pBlockAgg == NULL) {
|
||||
rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType);
|
||||
} else {
|
||||
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
|
||||
pRightBlock->pBlockAgg[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (leftNull && rightNull) {
|
||||
continue; // continue to next slot
|
||||
}
|
||||
|
||||
if (rightNull) {
|
||||
return pOrder->nullFirst ? 1 : -1;
|
||||
}
|
||||
|
||||
if (leftNull) {
|
||||
return pOrder->nullFirst ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
bool rightNull = false;
|
||||
if (pRightColInfoData->hasNull) {
|
||||
if (pRightBlock->pBlockAgg == NULL) {
|
||||
rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
|
||||
} else {
|
||||
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
|
||||
pRightBlock->pBlockAgg[i]);
|
||||
}
|
||||
void* left1, *right1;
|
||||
if (isVarType) {
|
||||
left1 = colDataGetVarData(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||
right1 = colDataGetVarData(pRightColInfoData, pRightSource->src.rowIndex);
|
||||
} else {
|
||||
left1 = colDataGetNumData(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||
right1 = colDataGetNumData(pRightColInfoData, pRightSource->src.rowIndex);
|
||||
}
|
||||
|
||||
if (leftNull && rightNull) {
|
||||
continue; // continue to next slot
|
||||
__compar_fn_t fn = pOrder->compFn;
|
||||
if (!fn) {
|
||||
fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
|
||||
pOrder->compFn = fn;
|
||||
}
|
||||
|
||||
if (rightNull) {
|
||||
return pOrder->nullFirst ? 1 : -1;
|
||||
}
|
||||
|
||||
if (leftNull) {
|
||||
return pOrder->nullFirst ? -1 : 1;
|
||||
}
|
||||
|
||||
void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||
void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
|
||||
|
||||
__compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
|
||||
|
||||
int ret = fn(left1, right1);
|
||||
if (ret == 0) {
|
||||
continue;
|
||||
|
|
|
@ -490,6 +490,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
|
|||
COPY_SCALAR_FIELD(srcGroupId);
|
||||
COPY_SCALAR_FIELD(groupSort);
|
||||
COPY_SCALAR_FIELD(ignoreGroupId);
|
||||
COPY_SCALAR_FIELD(inputWithGroupId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -543,7 +544,8 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog
|
|||
CLONE_NODE_LIST_FIELD(pTags);
|
||||
CLONE_NODE_FIELD(pSubtable);
|
||||
COPY_SCALAR_FIELD(needBlockOutputTsOrder);
|
||||
COPY_SCALAR_FIELD(tsSlotId);
|
||||
COPY_SCALAR_FIELD(pkTsColId);
|
||||
COPY_SCALAR_FIELD(pkTsColTbId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -2271,6 +2271,7 @@ static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
|
|||
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
|
||||
static const char* jkMergePhysiPlanGroupSort = "GroupSort";
|
||||
static const char* jkMergePhysiPlanIgnoreGroupID = "IgnoreGroupID";
|
||||
static const char* jkMergePhysiPlanInputWithGroupId = "InputWithGroupId";
|
||||
|
||||
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
|
||||
|
@ -2294,6 +2295,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanIgnoreGroupID, pNode->ignoreGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanInputWithGroupId, pNode->inputWithGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2682,6 +2682,7 @@ enum {
|
|||
PHY_MERGE_CODE_SRC_GROUP_ID,
|
||||
PHY_MERGE_CODE_GROUP_SORT,
|
||||
PHY_MERGE_CODE_IGNORE_GROUP_ID,
|
||||
PHY_MERGE_CODE_INPUT_WITH_GROUP_ID,
|
||||
};
|
||||
|
||||
static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
|
@ -2706,6 +2707,9 @@ static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, pNode->inputWithGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2738,6 +2742,9 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_MERGE_CODE_IGNORE_GROUP_ID:
|
||||
code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId);
|
||||
break;
|
||||
case PHY_MERGE_CODE_INPUT_WITH_GROUP_ID:
|
||||
code = tlvDecodeBool(pTlv, &pNode->inputWithGroupId);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1265,7 +1265,8 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
|
|||
pPartition->needBlockOutputTsOrder = true;
|
||||
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSelect->pWindow;
|
||||
SColumnNode* pTsCol = (SColumnNode*)pInterval->pCol;
|
||||
pPartition->tsSlotId = pTsCol->slotId;
|
||||
pPartition->pkTsColId = pTsCol->colId;
|
||||
pPartition->pkTsColTbId = pTsCol->tableId;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags) {
|
||||
|
|
|
@ -2838,13 +2838,14 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu
|
|||
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT);
|
||||
return true;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||
if (((SSortLogicNode*)pNodeLimitPushTo)->calcGroupId) break;
|
||||
// fall through
|
||||
case QUERY_NODE_LOGIC_PLAN_FILL:
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
||||
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT);
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, pNodeLimitPushTo->pChildren) { pushDownLimitHow(pNodeLimitPushTo, (SLogicNode*)pChild); }
|
||||
return true;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG: {
|
||||
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT &&
|
||||
(isPartTagAgg((SAggLogicNode*)pNodeLimitPushTo) || isPartTableAgg((SAggLogicNode*)pNodeLimitPushTo))) {
|
||||
|
@ -3585,11 +3586,13 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT);
|
||||
if (pSort) {
|
||||
bool alreadyPartByPKTs = false;
|
||||
pSort->groupSort = false;
|
||||
TSWAP(pSort->node.pChildren, pPartition->node.pChildren);
|
||||
optResetParent((SLogicNode*)pSort);
|
||||
FOREACH(node, pPartition->pPartitionKeys) {
|
||||
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||
if (QUERY_NODE_COLUMN == nodeType(node) && ((SColumnNode*)node)->colId == pPartition->pkTsColId &&
|
||||
((SColumnNode*)node)->tableId == pPartition->pkTsColTbId)
|
||||
alreadyPartByPKTs = true;
|
||||
if (!pOrder) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
|
@ -3600,7 +3603,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pPartition->needBlockOutputTsOrder) {
|
||||
if (pPartition->needBlockOutputTsOrder && !alreadyPartByPKTs) {
|
||||
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||
if (!pOrder) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -3612,7 +3615,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
|
|||
FOREACH(node, pPartition->node.pTargets) {
|
||||
if (nodeType(node) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pCol = (SColumnNode*)node;
|
||||
if (pCol->slotId == pPartition->tsSlotId) {
|
||||
if (pCol->colId == pPartition->pkTsColId && pCol->tableId == pPartition->pkTsColTbId) {
|
||||
pOrder->pExpr = nodesCloneNode((SNode*)pCol);
|
||||
break;
|
||||
}
|
||||
|
@ -3624,10 +3627,6 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pSort->node.pTargets = nodesCloneList(((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0))->pTargets);
|
||||
if (!pSort->node.pTargets) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
nodesDestroyNode((SNode*)pSort);
|
||||
pSort = NULL;
|
||||
|
@ -3651,6 +3650,9 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
// if sort create failed, we eat the error, skip the optimization
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
TSWAP(pSort->node.pChildren, pNode->node.pChildren);
|
||||
TSWAP(pSort->node.pTargets, pNode->node.pTargets);
|
||||
optResetParent((SLogicNode*)pSort);
|
||||
pSort->calcGroupId = true;
|
||||
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -1800,7 +1800,6 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList*
|
|||
SNodeList* pPartitionKeys = NULL;
|
||||
int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
|
||||
pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder;
|
||||
pPart->tsSlotId = pPartLogicNode->tsSlotId;
|
||||
|
||||
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
||||
// push down expression to pOutputDataBlockDesc of child node
|
||||
|
@ -1822,6 +1821,22 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList*
|
|||
}
|
||||
}
|
||||
|
||||
if (pPart->needBlockOutputTsOrder) {
|
||||
SNode* node;
|
||||
bool found = false;
|
||||
FOREACH(node, pPartLogicNode->node.pTargets) {
|
||||
if (nodeType(node) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pCol = (SColumnNode*)node;
|
||||
if (pCol->tableId == pPartLogicNode->pkTsColTbId && pCol->colId == pPartLogicNode->pkTsColId) {
|
||||
pPart->tsSlotId = pCol->slotId;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found) code = TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setConditionsSlotId(pCxt, (const SLogicNode*)pPartLogicNode, (SPhysiNode*)pPart);
|
||||
}
|
||||
|
@ -1948,6 +1963,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
|
|||
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
|
||||
pMerge->groupSort = pMergeLogicNode->groupSort;
|
||||
pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
|
||||
pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId;
|
||||
|
||||
int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
|
||||
|
||||
|
|
|
@ -524,6 +524,11 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p
|
|||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
||||
SSortLogicNode* pSort = (SSortLogicNode*)pNode;
|
||||
if (pSort->calcGroupId) pMerge->inputWithGroupId = true;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -35,6 +35,9 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||
|
|
|
@ -14,6 +14,9 @@ from util.dnodes import *
|
|||
from util.common import *
|
||||
# from tmqCommon import *
|
||||
|
||||
COMPARE_DATA = 0
|
||||
COMPARE_LEN = 1
|
||||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.vgroups = 4
|
||||
|
@ -179,10 +182,10 @@ class TDTestCase:
|
|||
|
||||
def explain_sql(self, sql: str):
|
||||
sql = "explain " + sql
|
||||
tdSql.query(sql)
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
return tdSql.queryResult
|
||||
|
||||
def query_and_compare_res(self, sql1, sql2):
|
||||
def query_and_compare_res(self, sql1, sql2, compare_what: int = 0):
|
||||
dur = self.query_with_time(sql1)
|
||||
tdLog.debug("sql1 query with time: [%f]" % dur)
|
||||
res1 = tdSql.queryResult
|
||||
|
@ -191,31 +194,35 @@ class TDTestCase:
|
|||
res2 = tdSql.queryResult
|
||||
if res1 is None or res2 is None:
|
||||
tdLog.exit("res1 or res2 is None")
|
||||
if len(res1) != len(res2):
|
||||
tdLog.exit("query and copare failed cause different rows, sql1: [%s], rows: [%d], sql2: [%s], rows: [%d]" % (sql1, len(res1), sql2, len(res2)))
|
||||
for i in range(0, len(res1)):
|
||||
if res1[i] != res2[i]:
|
||||
tdLog.exit("compare failed for row: [%d], sqls: [%s] res1: [%s], sql2 : [%s] res2: [%s]" % (i, sql1, res1[i], sql2, res2[i]))
|
||||
if compare_what <= COMPARE_LEN:
|
||||
if len(res1) != len(res2):
|
||||
tdLog.exit("query and copare failed cause different rows, sql1: [%s], rows: [%d], sql2: [%s], rows: [%d]" % (sql1, len(res1), sql2, len(res2)))
|
||||
if compare_what == COMPARE_DATA:
|
||||
for i in range(0, len(res1)):
|
||||
if res1[i] != res2[i]:
|
||||
tdLog.exit("compare failed for row: [%d], sqls: [%s] res1: [%s], sql2 : [%s] res2: [%s]" % (i, sql1, res1[i], sql2, res2[i]))
|
||||
tdLog.debug("sql: [%s] and sql: [%s] have same results, rows: [%d]" % (sql1, sql2, len(res1)))
|
||||
|
||||
def prepare_and_query(self, sqls: [], order_by: str, select_list: str = "*"):
|
||||
def prepare_and_query_and_compare(self, sqls: [], order_by: str, select_list: str = "*", compare_what: int = 0):
|
||||
for sql in sqls:
|
||||
sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list)
|
||||
sql = self.add_order_by(sql, order_by, select_list)
|
||||
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||
self.check_explain_res_has_row("Partition", self.explain_sql(sql))
|
||||
self.query_and_compare_res(sql, sql_hint)
|
||||
self.query_and_compare_res(sql, sql_hint, compare_what=compare_what)
|
||||
|
||||
def test_sort_for_partition_res(self):
|
||||
sqls_par_c1_agg = [
|
||||
"select count(*), c1 from meters partition by c1",
|
||||
"select count(*), min(c2), max(c3), c1 from meters partition by c1",
|
||||
"select c1 from meters partition by c1",
|
||||
]
|
||||
sqls_par_c1 = [
|
||||
"select * from meters partition by c1"
|
||||
]
|
||||
sqls_par_c1_c2_agg = [
|
||||
"select count(*), c1, c2 from meters partition by c1, c2",
|
||||
"select c1, c2 from meters partition by c1, c2",
|
||||
"select count(*), c1, c2, min(c4), max(c5), sum(c6) from meters partition by c1, c2",
|
||||
]
|
||||
sqls_par_c1_c2 = [
|
||||
|
@ -228,32 +235,32 @@ class TDTestCase:
|
|||
sqls_par_tag_c1 = [
|
||||
"select count(*), c1, t1 from meters partition by t1, c1"
|
||||
]
|
||||
self.prepare_and_query(sqls_par_c1_agg, "c1")
|
||||
self.prepare_and_query(sqls_par_c1, "c1, ts, c2", "c1, ts, c2")
|
||||
self.prepare_and_query(sqls_par_c1_c2_agg, "c1, c2")
|
||||
self.prepare_and_query(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3")
|
||||
self.prepare_and_query(sqls_par_tbname_c1, "a, c1")
|
||||
self.prepare_and_query(sqls_par_tag_c1, "t1, c1")
|
||||
self.prepare_and_query_and_compare(sqls_par_c1_agg, "c1")
|
||||
self.prepare_and_query_and_compare(sqls_par_c1, "c1, ts, c2", "c1, ts, c2")
|
||||
self.prepare_and_query_and_compare(sqls_par_c1_c2_agg, "c1, c2")
|
||||
self.prepare_and_query_and_compare(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3")
|
||||
self.prepare_and_query_and_compare(sqls_par_tbname_c1, "a, c1")
|
||||
self.prepare_and_query_and_compare(sqls_par_tag_c1, "t1, c1")
|
||||
|
||||
def get_interval_template_sqls(self, col_name):
|
||||
sqls = [
|
||||
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1s)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30s)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1m)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30m)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1h)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30s)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1m)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30m)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1h)' % (col_name, col_name),
|
||||
|
||||
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1s)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30s)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1m)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30s)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1m)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name),
|
||||
|
||||
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1s)' % (col_name, col_name, col_name),
|
||||
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name),
|
||||
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name),
|
||||
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name),
|
||||
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name),
|
||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name),
|
||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name),
|
||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name),
|
||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name),
|
||||
|
||||
'select _wstart as ts, count(*), tbname as a, %s from meters partition by %s, tbname interval(1s)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), t1 as a, %s from meters partition by %s, t1 interval(1s)' % (col_name, col_name),
|
||||
|
@ -263,29 +270,48 @@ class TDTestCase:
|
|||
|
||||
def test_sort_for_partition_interval(self):
|
||||
sqls, order_list = self.get_interval_template_sqls('c1')
|
||||
self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c2')
|
||||
self.prepare_and_query(sqls, order_list)
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
#sqls, order_list = self.get_interval_template_sqls('c2')
|
||||
#self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c3')
|
||||
self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c4')
|
||||
self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c5')
|
||||
self.prepare_and_query(sqls, order_list)
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
#sqls, order_list = self.get_interval_template_sqls('c4')
|
||||
#self.prepare_and_query(sqls, order_list)
|
||||
#sqls, order_list = self.get_interval_template_sqls('c5')
|
||||
#self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c6')
|
||||
self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c7')
|
||||
self.prepare_and_query(sqls, order_list)
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
#sqls, order_list = self.get_interval_template_sqls('c7')
|
||||
#self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c8')
|
||||
self.prepare_and_query(sqls, order_list)
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c9')
|
||||
self.prepare_and_query(sqls, order_list)
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
|
||||
def test_sort_for_partition_no_agg_limit(self):
|
||||
sqls_template = [
|
||||
'select * from meters partition by c1 slimit %d limit %d',
|
||||
'select * from meters partition by c2 slimit %d limit %d',
|
||||
'select * from meters partition by c8 slimit %d limit %d',
|
||||
]
|
||||
sqls = []
|
||||
for sql in sqls_template:
|
||||
sqls.append(sql % (1,1))
|
||||
sqls.append(sql % (1,10))
|
||||
sqls.append(sql % (10,10))
|
||||
sqls.append(sql % (100, 100))
|
||||
order_by_list = 'ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,t1,t2,t3,t4,t5,t6'
|
||||
|
||||
self.prepare_and_query_and_compare(sqls, order_by_list, compare_what=COMPARE_LEN)
|
||||
|
||||
|
||||
def run(self):
|
||||
self.prepareTestEnv()
|
||||
#time.sleep(99999999)
|
||||
self.test_sort_for_partition_hint()
|
||||
self.test_sort_for_partition_res()
|
||||
self.test_sort_for_partition_interval()
|
||||
self.test_sort_for_partition_no_agg_limit()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
Loading…
Reference in New Issue