diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index bc6851475a..b2aff6fd7e 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -83,6 +83,15 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, } } +static FORCE_INLINE bool colDataIsNull_t(const SColumnInfoData* pColumnInfoData, uint32_t row, bool isVarType) { + if (!pColumnInfoData->hasNull) return false; + if (isVarType) { + return colDataIsNull_var(pColumnInfoData, row); + } else { + return pColumnInfoData->nullbitmap ? colDataIsNull_f(pColumnInfoData->nullbitmap, row) : false; + } +} + static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) { if (!pColumnInfoData->hasNull) { diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 14d70b5812..885e9e5a30 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -223,6 +223,7 @@ typedef struct SMergeLogicNode { int32_t srcGroupId; bool groupSort; bool ignoreGroupId; + bool inputWithGroupId; } SMergeLogicNode; typedef enum EWindowType { @@ -294,7 +295,8 @@ typedef struct SPartitionLogicNode { SNode* pSubtable; bool needBlockOutputTsOrder; // if true, partition output block will have ts order maintained - int32_t tsSlotId; + int32_t pkTsColId; + uint64_t pkTsColTbId; } SPartitionLogicNode; typedef enum ESubplanType { @@ -534,6 +536,7 @@ typedef struct SMergePhysiNode { int32_t srcGroupId; bool groupSort; bool ignoreGroupId; + bool inputWithGroupId; } SMergePhysiNode; typedef struct SWindowPhysiNode { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index e57164fdd8..753d3e680c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2265,11 +2265,11 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) { SColumnDataAgg* pColAgg = NULL; const char* isNull = oldkeyBuf; - const char* p = oldkeyBuf + sizeof(int8_t) * taosArrayGetSize(pSortGroupCols); + const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size; - for (int32_t i = 0; i < taosArrayGetSize(pSortGroupCols); ++i) { - const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i); - const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + for (int32_t i = 0; i < pSortGroupCols->size; ++i) { + const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); + const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { @@ -2296,15 +2296,15 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) { - uint32_t colNum = taosArrayGetSize(pSortGroupCols); + uint32_t colNum = pSortGroupCols->size; SColumnDataAgg* pColAgg = NULL; char* isNull = keyBuf; char* p = keyBuf + sizeof(int8_t) * colNum; for (int32_t i = 0; i < colNum; ++i) { - const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i); - const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); - if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) continue; + const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); + const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); + if (pCol->slotId > pBlock->pDataBlock->size) continue; if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 18a666d2c2..6918f9a52a 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -692,6 +692,7 @@ typedef struct SMultiwayMergeOperatorInfo { bool ignoreGroupId; uint64_t groupId; STupleHandle* prefetchedTuple; + bool inputWithGroupId; } SMultiwayMergeOperatorInfo; int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { @@ -742,7 +743,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* while (1) { STupleHandle* pTupleHandle = NULL; - if (pInfo->groupSort) { + if (pInfo->groupSort || pInfo->inputWithGroupId) { if (pInfo->prefetchedTuple == NULL) { pTupleHandle = tsortNextTuple(pHandle); } else { @@ -763,7 +764,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* break; } - if (pInfo->groupSort) { + if (pInfo->groupSort || pInfo->inputWithGroupId) { uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) { appendOneRowToDataBlock(p, pTupleHandle); @@ -943,6 +944,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder; + pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId; setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index f4c2735d93..e6668235ad 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -616,48 +616,62 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { int ret = pParam->cmpFn(left1, right1); return ret; } else { + bool isVarType; for (int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); + isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type); - bool leftNull = false; - if (pLeftColInfoData->hasNull) { - if (pLeftBlock->pBlockAgg == NULL) { - leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex); - } else { - leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, - pLeftBlock->pBlockAgg[i]); + if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) { + bool leftNull = false; + if (pLeftColInfoData->hasNull) { + if (pLeftBlock->pBlockAgg == NULL) { + leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType); + } else { + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, + pLeftBlock->pBlockAgg[i]); + } + } + + bool rightNull = false; + if (pRightColInfoData->hasNull) { + if (pRightBlock->pBlockAgg == NULL) { + rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType); + } else { + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, + pRightBlock->pBlockAgg[i]); + } + } + + if (leftNull && rightNull) { + continue; // continue to next slot + } + + if (rightNull) { + return pOrder->nullFirst ? 1 : -1; + } + + if (leftNull) { + return pOrder->nullFirst ? -1 : 1; } } - bool rightNull = false; - if (pRightColInfoData->hasNull) { - if (pRightBlock->pBlockAgg == NULL) { - rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex); - } else { - rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, - pRightBlock->pBlockAgg[i]); - } + void* left1, *right1; + if (isVarType) { + left1 = colDataGetVarData(pLeftColInfoData, pLeftSource->src.rowIndex); + right1 = colDataGetVarData(pRightColInfoData, pRightSource->src.rowIndex); + } else { + left1 = colDataGetNumData(pLeftColInfoData, pLeftSource->src.rowIndex); + right1 = colDataGetNumData(pRightColInfoData, pRightSource->src.rowIndex); } - if (leftNull && rightNull) { - continue; // continue to next slot + __compar_fn_t fn = pOrder->compFn; + if (!fn) { + fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order); + pOrder->compFn = fn; } - if (rightNull) { - return pOrder->nullFirst ? 1 : -1; - } - - if (leftNull) { - return pOrder->nullFirst ? -1 : 1; - } - - void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex); - void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex); - - __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order); - int ret = fn(left1, right1); if (ret == 0) { continue; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 817433f5be..3d48036095 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -490,6 +490,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst COPY_SCALAR_FIELD(srcGroupId); COPY_SCALAR_FIELD(groupSort); COPY_SCALAR_FIELD(ignoreGroupId); + COPY_SCALAR_FIELD(inputWithGroupId); return TSDB_CODE_SUCCESS; } @@ -543,7 +544,8 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_FIELD(pSubtable); COPY_SCALAR_FIELD(needBlockOutputTsOrder); - COPY_SCALAR_FIELD(tsSlotId); + COPY_SCALAR_FIELD(pkTsColId); + COPY_SCALAR_FIELD(pkTsColTbId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index b5dff20440..c2acf0dbdf 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2271,6 +2271,7 @@ static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels"; static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId"; static const char* jkMergePhysiPlanGroupSort = "GroupSort"; static const char* jkMergePhysiPlanIgnoreGroupID = "IgnoreGroupID"; +static const char* jkMergePhysiPlanInputWithGroupId = "InputWithGroupId"; static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj; @@ -2294,6 +2295,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanIgnoreGroupID, pNode->ignoreGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanInputWithGroupId, pNode->inputWithGroupId); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 992097e8c5..99100b2a1d 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2682,6 +2682,7 @@ enum { PHY_MERGE_CODE_SRC_GROUP_ID, PHY_MERGE_CODE_GROUP_SORT, PHY_MERGE_CODE_IGNORE_GROUP_ID, + PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, }; static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2706,6 +2707,9 @@ static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, pNode->inputWithGroupId); + } return code; } @@ -2738,6 +2742,9 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { case PHY_MERGE_CODE_IGNORE_GROUP_ID: code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId); break; + case PHY_MERGE_CODE_INPUT_WITH_GROUP_ID: + code = tlvDecodeBool(pTlv, &pNode->inputWithGroupId); + break; default: break; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index daf5e363c6..35b376ffbd 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1265,7 +1265,8 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS pPartition->needBlockOutputTsOrder = true; SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSelect->pWindow; SColumnNode* pTsCol = (SColumnNode*)pInterval->pCol; - pPartition->tsSlotId = pTsCol->slotId; + pPartition->pkTsColId = pTsCol->colId; + pPartition->pkTsColTbId = pTsCol->tableId; } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 36c54e342d..c81ca0b3d1 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2838,13 +2838,14 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT); return true; } + case QUERY_NODE_LOGIC_PLAN_SORT: + if (((SSortLogicNode*)pNodeLimitPushTo)->calcGroupId) break; + // fall through case QUERY_NODE_LOGIC_PLAN_FILL: - case QUERY_NODE_LOGIC_PLAN_SORT: { cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT); SNode* pChild = NULL; FOREACH(pChild, pNodeLimitPushTo->pChildren) { pushDownLimitHow(pNodeLimitPushTo, (SLogicNode*)pChild); } return true; - } case QUERY_NODE_LOGIC_PLAN_AGG: { if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && (isPartTagAgg((SAggLogicNode*)pNodeLimitPushTo) || isPartTableAgg((SAggLogicNode*)pNodeLimitPushTo))) { @@ -3585,11 +3586,13 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { int32_t code = TSDB_CODE_SUCCESS; SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT); if (pSort) { + bool alreadyPartByPKTs = false; pSort->groupSort = false; - TSWAP(pSort->node.pChildren, pPartition->node.pChildren); - optResetParent((SLogicNode*)pSort); FOREACH(node, pPartition->pPartitionKeys) { SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); + if (QUERY_NODE_COLUMN == nodeType(node) && ((SColumnNode*)node)->colId == pPartition->pkTsColId && + ((SColumnNode*)node)->tableId == pPartition->pkTsColTbId) + alreadyPartByPKTs = true; if (!pOrder) { code = TSDB_CODE_OUT_OF_MEMORY; } else { @@ -3600,7 +3603,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { } } - if (pPartition->needBlockOutputTsOrder) { + if (pPartition->needBlockOutputTsOrder && !alreadyPartByPKTs) { SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); if (!pOrder) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -3612,7 +3615,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { FOREACH(node, pPartition->node.pTargets) { if (nodeType(node) == QUERY_NODE_COLUMN) { SColumnNode* pCol = (SColumnNode*)node; - if (pCol->slotId == pPartition->tsSlotId) { + if (pCol->colId == pPartition->pkTsColId && pCol->tableId == pPartition->pkTsColTbId) { pOrder->pExpr = nodesCloneNode((SNode*)pCol); break; } @@ -3624,10 +3627,6 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { } } } - if (code == TSDB_CODE_SUCCESS) { - pSort->node.pTargets = nodesCloneList(((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0))->pTargets); - if (!pSort->node.pTargets) code = TSDB_CODE_OUT_OF_MEMORY; - } if (code != TSDB_CODE_SUCCESS) { nodesDestroyNode((SNode*)pSort); pSort = NULL; @@ -3651,6 +3650,9 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub // if sort create failed, we eat the error, skip the optimization code = TSDB_CODE_SUCCESS; } else { + TSWAP(pSort->node.pChildren, pNode->node.pChildren); + TSWAP(pSort->node.pTargets, pNode->node.pTargets); + optResetParent((SLogicNode*)pSort); pSort->calcGroupId = true; code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort); if (code == TSDB_CODE_SUCCESS) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index bb3fa9a10c..d55e80a23d 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1800,7 +1800,6 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* SNodeList* pPartitionKeys = NULL; int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys); pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder; - pPart->tsSlotId = pPartLogicNode->tsSlotId; SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); // push down expression to pOutputDataBlockDesc of child node @@ -1822,6 +1821,22 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* } } + if (pPart->needBlockOutputTsOrder) { + SNode* node; + bool found = false; + FOREACH(node, pPartLogicNode->node.pTargets) { + if (nodeType(node) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)node; + if (pCol->tableId == pPartLogicNode->pkTsColTbId && pCol->colId == pPartLogicNode->pkTsColId) { + pPart->tsSlotId = pCol->slotId; + found = true; + break; + } + } + } + if (!found) code = TSDB_CODE_PLAN_INTERNAL_ERROR; + } + if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pPartLogicNode, (SPhysiNode*)pPart); } @@ -1948,6 +1963,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM pMerge->srcGroupId = pMergeLogicNode->srcGroupId; pMerge->groupSort = pMergeLogicNode->groupSort; pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId; + pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId; int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index a77a8e72be..881cc02062 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -524,6 +524,11 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p } break; } + case QUERY_NODE_LOGIC_PLAN_SORT: { + SSortLogicNode* pSort = (SSortLogicNode*)pNode; + if (pSort->calcGroupId) pMerge->inputWithGroupId = true; + break; + } default: break; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 279836ca18..61b67639f6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -35,6 +35,9 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py diff --git a/tests/system-test/2-query/partition_by_col.py b/tests/system-test/2-query/partition_by_col.py index a7930337fa..1a394649d6 100644 --- a/tests/system-test/2-query/partition_by_col.py +++ b/tests/system-test/2-query/partition_by_col.py @@ -14,6 +14,9 @@ from util.dnodes import * from util.common import * # from tmqCommon import * +COMPARE_DATA = 0 +COMPARE_LEN = 1 + class TDTestCase: def __init__(self): self.vgroups = 4 @@ -179,10 +182,10 @@ class TDTestCase: def explain_sql(self, sql: str): sql = "explain " + sql - tdSql.query(sql) + tdSql.query(sql, queryTimes=1) return tdSql.queryResult - def query_and_compare_res(self, sql1, sql2): + def query_and_compare_res(self, sql1, sql2, compare_what: int = 0): dur = self.query_with_time(sql1) tdLog.debug("sql1 query with time: [%f]" % dur) res1 = tdSql.queryResult @@ -191,31 +194,35 @@ class TDTestCase: res2 = tdSql.queryResult if res1 is None or res2 is None: tdLog.exit("res1 or res2 is None") - if len(res1) != len(res2): - tdLog.exit("query and copare failed cause different rows, sql1: [%s], rows: [%d], sql2: [%s], rows: [%d]" % (sql1, len(res1), sql2, len(res2))) - for i in range(0, len(res1)): - if res1[i] != res2[i]: - tdLog.exit("compare failed for row: [%d], sqls: [%s] res1: [%s], sql2 : [%s] res2: [%s]" % (i, sql1, res1[i], sql2, res2[i])) + if compare_what <= COMPARE_LEN: + if len(res1) != len(res2): + tdLog.exit("query and copare failed cause different rows, sql1: [%s], rows: [%d], sql2: [%s], rows: [%d]" % (sql1, len(res1), sql2, len(res2))) + if compare_what == COMPARE_DATA: + for i in range(0, len(res1)): + if res1[i] != res2[i]: + tdLog.exit("compare failed for row: [%d], sqls: [%s] res1: [%s], sql2 : [%s] res2: [%s]" % (i, sql1, res1[i], sql2, res2[i])) tdLog.debug("sql: [%s] and sql: [%s] have same results, rows: [%d]" % (sql1, sql2, len(res1))) - def prepare_and_query(self, sqls: [], order_by: str, select_list: str = "*"): + def prepare_and_query_and_compare(self, sqls: [], order_by: str, select_list: str = "*", compare_what: int = 0): for sql in sqls: sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list) sql = self.add_order_by(sql, order_by, select_list) self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) self.check_explain_res_has_row("Partition", self.explain_sql(sql)) - self.query_and_compare_res(sql, sql_hint) + self.query_and_compare_res(sql, sql_hint, compare_what=compare_what) def test_sort_for_partition_res(self): sqls_par_c1_agg = [ "select count(*), c1 from meters partition by c1", "select count(*), min(c2), max(c3), c1 from meters partition by c1", + "select c1 from meters partition by c1", ] sqls_par_c1 = [ "select * from meters partition by c1" ] sqls_par_c1_c2_agg = [ "select count(*), c1, c2 from meters partition by c1, c2", + "select c1, c2 from meters partition by c1, c2", "select count(*), c1, c2, min(c4), max(c5), sum(c6) from meters partition by c1, c2", ] sqls_par_c1_c2 = [ @@ -228,32 +235,32 @@ class TDTestCase: sqls_par_tag_c1 = [ "select count(*), c1, t1 from meters partition by t1, c1" ] - self.prepare_and_query(sqls_par_c1_agg, "c1") - self.prepare_and_query(sqls_par_c1, "c1, ts, c2", "c1, ts, c2") - self.prepare_and_query(sqls_par_c1_c2_agg, "c1, c2") - self.prepare_and_query(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3") - self.prepare_and_query(sqls_par_tbname_c1, "a, c1") - self.prepare_and_query(sqls_par_tag_c1, "t1, c1") + self.prepare_and_query_and_compare(sqls_par_c1_agg, "c1") + self.prepare_and_query_and_compare(sqls_par_c1, "c1, ts, c2", "c1, ts, c2") + self.prepare_and_query_and_compare(sqls_par_c1_c2_agg, "c1, c2") + self.prepare_and_query_and_compare(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3") + self.prepare_and_query_and_compare(sqls_par_tbname_c1, "a, c1") + self.prepare_and_query_and_compare(sqls_par_tag_c1, "t1, c1") def get_interval_template_sqls(self, col_name): sqls = [ 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1s)' % (col_name, col_name), - 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30s)' % (col_name, col_name), - 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1m)' % (col_name, col_name), - 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30m)' % (col_name, col_name), - 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1h)' % (col_name, col_name), + #'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30s)' % (col_name, col_name), + #'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1m)' % (col_name, col_name), + #'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30m)' % (col_name, col_name), + #'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1h)' % (col_name, col_name), 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1s)' % (col_name, col_name), - 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30s)' % (col_name, col_name), - 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1m)' % (col_name, col_name), - 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name), - 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name), + #'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30s)' % (col_name, col_name), + #'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1m)' % (col_name, col_name), + #'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name), + #'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name), 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1s)' % (col_name, col_name, col_name), - 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name), - 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name), - 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name), - 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name), + #'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name), + #'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name), + #'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name), + #'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name), 'select _wstart as ts, count(*), tbname as a, %s from meters partition by %s, tbname interval(1s)' % (col_name, col_name), 'select _wstart as ts, count(*), t1 as a, %s from meters partition by %s, t1 interval(1s)' % (col_name, col_name), @@ -263,29 +270,48 @@ class TDTestCase: def test_sort_for_partition_interval(self): sqls, order_list = self.get_interval_template_sqls('c1') - self.prepare_and_query(sqls, order_list) - sqls, order_list = self.get_interval_template_sqls('c2') - self.prepare_and_query(sqls, order_list) + self.prepare_and_query_and_compare(sqls, order_list) + #sqls, order_list = self.get_interval_template_sqls('c2') + #self.prepare_and_query(sqls, order_list) sqls, order_list = self.get_interval_template_sqls('c3') - self.prepare_and_query(sqls, order_list) - sqls, order_list = self.get_interval_template_sqls('c4') - self.prepare_and_query(sqls, order_list) - sqls, order_list = self.get_interval_template_sqls('c5') - self.prepare_and_query(sqls, order_list) + self.prepare_and_query_and_compare(sqls, order_list) + #sqls, order_list = self.get_interval_template_sqls('c4') + #self.prepare_and_query(sqls, order_list) + #sqls, order_list = self.get_interval_template_sqls('c5') + #self.prepare_and_query(sqls, order_list) sqls, order_list = self.get_interval_template_sqls('c6') - self.prepare_and_query(sqls, order_list) - sqls, order_list = self.get_interval_template_sqls('c7') - self.prepare_and_query(sqls, order_list) + self.prepare_and_query_and_compare(sqls, order_list) + #sqls, order_list = self.get_interval_template_sqls('c7') + #self.prepare_and_query(sqls, order_list) sqls, order_list = self.get_interval_template_sqls('c8') - self.prepare_and_query(sqls, order_list) + self.prepare_and_query_and_compare(sqls, order_list) sqls, order_list = self.get_interval_template_sqls('c9') - self.prepare_and_query(sqls, order_list) + self.prepare_and_query_and_compare(sqls, order_list) + + def test_sort_for_partition_no_agg_limit(self): + sqls_template = [ + 'select * from meters partition by c1 slimit %d limit %d', + 'select * from meters partition by c2 slimit %d limit %d', + 'select * from meters partition by c8 slimit %d limit %d', + ] + sqls = [] + for sql in sqls_template: + sqls.append(sql % (1,1)) + sqls.append(sql % (1,10)) + sqls.append(sql % (10,10)) + sqls.append(sql % (100, 100)) + order_by_list = 'ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,t1,t2,t3,t4,t5,t6' + + self.prepare_and_query_and_compare(sqls, order_by_list, compare_what=COMPARE_LEN) + def run(self): self.prepareTestEnv() + #time.sleep(99999999) self.test_sort_for_partition_hint() self.test_sort_for_partition_res() self.test_sort_for_partition_interval() + self.test_sort_for_partition_no_agg_limit() def stop(self): tdSql.close()