From 2c4e0fee0724924dddd4af398c1285b7d5b37741 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 29 Aug 2023 15:46:03 +0800 Subject: [PATCH] feat: optimize partition node, replace with sort node --- include/common/ttokendef.h | 1 + include/libs/nodes/plannodes.h | 3 +- include/libs/nodes/querynodes.h | 1 + source/libs/executor/inc/executil.h | 20 ++ source/libs/executor/inc/tsort.h | 2 + source/libs/executor/src/aggregateoperator.c | 1 + source/libs/executor/src/executil.c | 129 ++++++++++ source/libs/executor/src/groupoperator.c | 14 -- source/libs/executor/src/sortoperator.c | 128 +++++++++- source/libs/executor/src/tsort.c | 6 + source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 8 +- source/libs/nodes/src/nodesMsgFuncs.c | 8 +- source/libs/parser/src/parAstCreater.c | 11 + source/libs/parser/src/parTokenizer.c | 1 + source/libs/planner/inc/planInt.h | 11 +- source/libs/planner/src/planLogicCreater.c | 1 - source/libs/planner/src/planOptimizer.c | 63 +++++ source/libs/planner/src/planPhysiCreater.c | 1 + source/libs/planner/src/planSpliter.c | 7 +- source/libs/planner/src/planUtil.c | 11 + tests/parallel_test/cases.task | 1 + tests/system-test/2-query/partition_by_col.py | 227 ++++++++++++++++++ tests/system-test/win-test-file | 1 + 24 files changed, 621 insertions(+), 36 deletions(-) create mode 100644 tests/system-test/2-query/partition_by_col.py diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 748854c51b..4ccfeb32da 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -365,6 +365,7 @@ #define TK_NK_BIN 605 // bin format data 0b111 #define TK_BATCH_SCAN 606 #define TK_NO_BATCH_SCAN 607 +#define TK_SORT_FOR_GROUP 608 #define TK_NK_NIL 65535 diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 9d2247f479..79308a8c93 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -279,8 +279,8 @@ typedef struct SSortLogicNode { SLogicNode node; SNodeList* pSortKeys; bool groupSort; - int64_t maxRows; bool skipPKSortOpt; + bool calcGroupId; } SSortLogicNode; typedef struct SPartitionLogicNode { @@ -603,6 +603,7 @@ typedef struct SSortPhysiNode { SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode SNodeList* pTargets; + bool calcGroupId; } SSortPhysiNode; typedef SSortPhysiNode SGroupSortPhysiNode; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index d7d45c57ad..bb6713dc83 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -119,6 +119,7 @@ typedef struct SLeftValueNode { typedef enum EHintOption { HINT_NO_BATCH_SCAN = 1, HINT_BATCH_SCAN, + HINT_SORT_FOR_GROUP, } EHintOption; typedef struct SHintNode { diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 5e2ca462f7..863a85ef7a 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -196,4 +196,24 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, SStorageAPI* pStorageAPI); + +/** + * @brief extract col data according to sort/group cols + * @param pSortGroupCols sort keys or group keys, array of SColumnNode + * @param [out] pColVals col vals extracted, array of SGroupKeys + */ +void extractCols(SArray* pSortGroupCols, SArray* pColVals, SSDataBlock* pBlock, int32_t rowIndex); +/** + * @breif build keys buffer with col values + * @retval key length + * @param [out] buf buffer to store result key + */ +int32_t buildKeys(char* buf, SArray* pColVals); + +uint64_t calcGroupId(char *pData, int32_t len); + +SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys); + +int32_t extractSortGroupKeysInfo(SArray** pColVals, int32_t* keyLen, char** keyBuf, const SArray* pColList); + #endif // TDENGINE_EXECUTIL_H diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 3180173ca7..fe3ceb4b6f 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -194,6 +194,8 @@ void tsortSetClosed(SSortHandle* pHandle); void tsortSetSingleTableMerge(SSortHandle* pHandle); void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param); +int32_t tsortBuildKeys(SArray* pSortGroupCols, SArray* pColVals, STupleHandle* pTuple, char* keyBuf); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index f6a8c6689f..5e649af47e 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -303,6 +303,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || + downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT || (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) { return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 52631cd0db..97fea0e711 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2261,3 +2261,132 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t ts[3] = pWin->skey; // window start key ts[4] = pWin->ekey + delta; // window end key } + +void extractCols(SArray* pSortGroupCols, SArray* pColVals, SSDataBlock* pBlock, int32_t rowIndex) { + SColumnDataAgg* pColAgg = NULL; + + size_t numOfGroupCols = taosArrayGetSize(pSortGroupCols); + + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*) taosArrayGet(pSortGroupCols, i); + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + + // valid range check. todo: return error code. + if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) { + continue; + } + + if (pBlock->pBlockAgg != NULL) { + pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? + } + + SGroupKeys* pkey = taosArrayGet(pColVals, i); + if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { + pkey->isNull = true; + } else { + pkey->isNull = false; + char* val = colDataGetData(pColInfoData, rowIndex); + if (pkey->type == TSDB_DATA_TYPE_JSON) { + if (tTagIsJson(val)) { + terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; + return; + } + int32_t dataLen = getJsonValueLen(val); + memcpy(pkey->pData, val, dataLen); + } else if (IS_VAR_DATA_TYPE(pkey->type)) { + memcpy(pkey->pData, val, varDataTLen(val)); + ASSERT(varDataTLen(val) <= pkey->bytes); + } else { + memcpy(pkey->pData, val, pkey->bytes); + } + } + } +} + +int32_t buildKeys(char* buf, SArray* pColVals) { + size_t numOfGroupCols = taosArrayGetSize(pColVals); + + char* isNull = (char*)buf; + char* pStart = (char*)buf + sizeof(int8_t) * numOfGroupCols; + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SGroupKeys* pkey = taosArrayGet(pColVals, i); + if (pkey->isNull) { + isNull[i] = 1; + continue; + } + + isNull[i] = 0; + if (pkey->type == TSDB_DATA_TYPE_JSON) { + int32_t dataLen = getJsonValueLen(pkey->pData); + memcpy(pStart, (pkey->pData), dataLen); + pStart += dataLen; + } else if (IS_VAR_DATA_TYPE(pkey->type)) { + varDataCopy(pStart, pkey->pData); + pStart += varDataTLen(pkey->pData); + ASSERT(varDataTLen(pkey->pData) <= pkey->bytes); + } else { + memcpy(pStart, pkey->pData, pkey->bytes); + pStart += pkey->bytes; + } + } + + return (int32_t)(pStart - (char*)buf); +} + +uint64_t calcGroupId(char* pData, int32_t len) { + T_MD5_CTX context; + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pData, len); + tMD5Final(&context); + + // NOTE: only extract the initial 8 bytes of the final MD5 digest + uint64_t id = 0; + memcpy(&id, context.digest, sizeof(uint64_t)); + if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t)); + return id; +} + +SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) { + SNode* node; + SNodeList* ret = NULL; + FOREACH(node, pSortKeys) { + SOrderByExprNode* pSortKey = (SOrderByExprNode*)node; + nodesListMakeAppend(&ret, pSortKey->pExpr); + } + return ret; +} + +int32_t extractSortGroupKeysInfo(SArray** pColVals, int32_t* keyLen, char** keyBuf, const SArray* pColList) { + *pColVals = taosArrayInit(4, sizeof(SGroupKeys)); + if ((*pColVals) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + *keyLen = 0; + int32_t numOfGroupCols = taosArrayGetSize(pColList); + for (int32_t i = 0; i < numOfGroupCols; ++i) { + SColumn* pCol = (SColumn*)taosArrayGet(pColList, i); + (*keyLen) += pCol->bytes; // actual data + null_flag + + SGroupKeys key = {0}; + key.bytes = pCol->bytes; + key.type = pCol->type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, pCol->bytes); + if (key.pData == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush((*pColVals), &key); + } + + int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; + (*keyLen) += nullFlagSize; + + (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); + if ((*keyBuf) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index fb2204eae8..3778a2625c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -635,20 +635,6 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf return pPage; } -uint64_t calcGroupId(char* pData, int32_t len) { - T_MD5_CTX context; - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pData, len); - tMD5Final(&context); - - // NOTE: only extract the initial 8 bytes of the final MD5 digest - uint64_t id = 0; - memcpy(&id, context.digest, sizeof(uint64_t)); - if (0 == id) - memcpy(&id, context.digest + 8, sizeof(uint64_t)); - return id; -} - int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t)); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 9bd0991435..b71f95b22f 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -19,18 +19,29 @@ #include "querytask.h" #include "tdatablock.h" +typedef struct SSortOpGroupIdCalc { + STupleHandle* pSavedTuple; + SArray* pSortColsArr; + SArray* pSortColVals; + char* keyBuf; + char* lastKeyBuf; + int32_t lastKeysLen; + uint64_t lastGroupId; +} SSortOpGroupIdCalc; + typedef struct SSortOperatorInfo { - SOptrBasicInfo binfo; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - SColMatchInfo matchInfo; - int32_t bufPageSize; - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - SLimitInfo limitInfo; - uint64_t maxTupleLength; - int64_t maxRows; + SOptrBasicInfo binfo; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + SColMatchInfo matchInfo; + int32_t bufPageSize; + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + SLimitInfo limitInfo; + uint64_t maxTupleLength; + int64_t maxRows; + SSortOpGroupIdCalc* pGroupIdCalc; } SSortOperatorInfo; static SSDataBlock* doSort(SOperatorInfo* pOperator); @@ -40,6 +51,8 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain static void destroySortOperatorInfo(void* param); static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys); +static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc); + // todo add limit/offset impl SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); @@ -78,6 +91,35 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); + + if (pSortNode->calcGroupId) { + SSortOpGroupIdCalc* pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc)); + if (!pGroupIdCalc) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys); + if (!pSortColsNodeArr) code = TSDB_CODE_OUT_OF_MEMORY; + if (TSDB_CODE_SUCCESS == code) { + pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr); + if (!pGroupIdCalc->pSortColsArr) code = TSDB_CODE_OUT_OF_MEMORY; + nodesClearList(pSortColsNodeArr); + } + int32_t keyLen; + if (TSDB_CODE_SUCCESS == code) + code = extractSortGroupKeysInfo(&pGroupIdCalc->pSortColVals, &keyLen, &pGroupIdCalc->keyBuf, + pGroupIdCalc->pSortColsArr); + if (TSDB_CODE_SUCCESS == code) { + pGroupIdCalc->lastKeysLen = 0; + pGroupIdCalc->lastKeyBuf = taosMemoryCalloc(1, keyLen); + if (!pGroupIdCalc->lastKeyBuf) code = TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS == code) { + pInfo->pGroupIdCalc = pGroupIdCalc; + } + } + if (code != TSDB_CODE_SUCCESS) goto _error; + pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder; initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); @@ -129,6 +171,47 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { pBlock->info.rows += 1; } +/** + * @brief get next tuple with group id attached, all tuples fetched from tsortNextTuple are sorted by group keys + * @param pBlock the output block, the group id will be saved in it + * @retval NULL if next group tuple arrived, the pre fetched tuple will be saved in pInfo.pSavedTuple + */ +static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) { + STupleHandle *ret = pInfo->pGroupIdCalc->pSavedTuple; + pInfo->pGroupIdCalc->pSavedTuple = NULL; + if (!ret) { + ret = tsortNextTuple(pHandle); + } + + if (ret) { + int32_t len = tsortBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->pSortColVals, ret, + pInfo->pGroupIdCalc->keyBuf); + bool newGroup = len != pInfo->pGroupIdCalc->lastKeysLen + ? true + : memcmp(pInfo->pGroupIdCalc->lastKeyBuf, pInfo->pGroupIdCalc->keyBuf, len) != 0; + bool emptyBlock = pBlock->info.rows == 0; + if (newGroup && !emptyBlock) { + // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return NULL + pInfo->pGroupIdCalc->pSavedTuple = ret; + ret = NULL; + } else { + if (newGroup) { + ASSERT(emptyBlock); + pInfo->pGroupIdCalc->lastKeysLen = len; + pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId = calcGroupId(pInfo->pGroupIdCalc->keyBuf, len); + TSWAP(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeyBuf); + } else if (emptyBlock) { + // new block but not new group, assign last group id to it + pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId; + } else { + // not new group and not empty block and ret NOT NULL, just return the tuple + } + } + } + + return ret; +} + SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo) { blockDataCleanup(pDataBlock); @@ -140,8 +223,13 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i blockDataEnsureCapacity(p, capacity); + STupleHandle* pTupleHandle; while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + if (pInfo->pGroupIdCalc) { + pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p); + } else { + pTupleHandle = tsortNextTuple(pHandle); + } if (pTupleHandle == NULL) { break; } @@ -168,6 +256,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i pDataBlock->info.dataLoad = 1; pDataBlock->info.rows = p->info.rows; pDataBlock->info.scanFlag = p->info.scanFlag; + pDataBlock->info.id.groupId = p->info.id.groupId; } blockDataDestroy(p); @@ -281,6 +370,7 @@ void destroySortOperatorInfo(void* param) { tsortDestroySortHandle(pInfo->pSortHandle); taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->matchInfo.pList); + destroySortOpGroupIdCalc(pInfo->pGroupIdCalc); taosMemoryFreeClear(param); } @@ -309,6 +399,20 @@ static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNod return TSDB_CODE_SUCCESS; } +static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) { + if (pCalc) { + for (int i = 0; i < taosArrayGetSize(pCalc->pSortColVals); i++) { + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pCalc->pSortColVals, i); + taosMemoryFree(key.pData); + } + taosArrayDestroy(pCalc->pSortColVals); + taosArrayDestroy(pCalc->pSortColsArr); + taosMemoryFree(pCalc->keyBuf); + taosMemoryFree(pCalc->lastKeyBuf); + taosMemoryFree(pCalc); + } +} + //===================================================================================== // Group Sort Operator typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 6c4a780dfb..d88ba80466 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -25,6 +25,7 @@ #include "tsort.h" #include "tutil.h" #include "tsimplehash.h" +#include "executil.h" struct STupleHandle { SSDataBlock* pBlock; @@ -1553,3 +1554,8 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { return info; } + +int32_t tsortBuildKeys(SArray* pSortGroupCols, SArray* pColVals, STupleHandle* pTuple, char* keyBuf) { + extractCols(pSortGroupCols, pColVals, pTuple->pBlock, pTuple->rowIndex); + return buildKeys(keyBuf, pColVals); +} diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index c62190b68a..ede9fd36f0 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -531,6 +531,7 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pSortKeys); COPY_SCALAR_FIELD(groupSort); + COPY_SCALAR_FIELD(calcGroupId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c72b03817b..38f7b90ad7 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2327,7 +2327,7 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { static const char* jkSortPhysiPlanExprs = "Exprs"; static const char* jkSortPhysiPlanSortKeys = "SortKeys"; static const char* jkSortPhysiPlanTargets = "Targets"; -static const char* jkSortPhysiPlanMaxRows = "MaxRows"; +static const char* jkSortPhysiPlanCalcGroupIds = "CalcGroupIds"; static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2342,6 +2342,9 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanCalcGroupIds, pNode->calcGroupId); + } return code; } @@ -2359,6 +2362,9 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkSortPhysiPlanCalcGroupIds, &pNode->calcGroupId); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index bc037f05ec..13a860c95b 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2746,7 +2746,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS }; +enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_CALC_GROUPID }; static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2761,6 +2761,9 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_CALC_GROUPID, pNode->calcGroupId); + } return code; } @@ -2784,6 +2787,9 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; + case PHY_SORT_CODE_CALC_GROUPID: + code = tlvDecodeBool(pTlv, &pNode->calcGroupId); + break; default: break; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 1de7c412b1..a96bfe76fa 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -359,6 +359,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt } break; } + case HINT_SORT_FOR_GROUP: + if (paramNum > 0) return true; + break; default: return true; } @@ -421,6 +424,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) { } opt = HINT_NO_BATCH_SCAN; break; + case TK_SORT_FOR_GROUP: + lastComma = false; + if (0 != opt || inParamList) { + quit = true; + break; + } + opt = HINT_SORT_FOR_GROUP; + break; case TK_NK_LP: lastComma = false; if (0 == opt || inParamList) { diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index d74a77b134..520044dd33 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -206,6 +206,7 @@ static SKeyword keywordTable[] = { {"SMALLINT", TK_SMALLINT}, {"SNODE", TK_SNODE}, {"SNODES", TK_SNODES}, + {"SORT_FOR_GROUP", TK_SORT_FOR_GROUP}, {"SOFFSET", TK_SOFFSET}, {"SPLIT", TK_SPLIT}, {"STABLE", TK_STABLE}, diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index d8fb0def5f..b06c666c2d 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -44,12 +44,13 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan); int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList); -bool getBatchScanOptionFromHint(SNodeList* pList); +bool getBatchScanOptionFromHint(SNodeList* pList); +bool getSortForGroupOptHint(SNodeList* pList); SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr); -int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes); -bool isPartTableAgg(SAggLogicNode* pAgg); -bool isPartTagAgg(SAggLogicNode* pAgg); -bool isPartTableWinodw(SWindowLogicNode* pWindow); +int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes); +bool isPartTableAgg(SAggLogicNode* pAgg); +bool isPartTagAgg(SAggLogicNode* pAgg); +bool isPartTableWinodw(SWindowLogicNode* pWindow); #define CLONE_LIMIT 1 #define CLONE_SLIMIT 1 << 1 diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index d460c8074d..cec53e0e25 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -990,7 +990,6 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele if (NULL == pSelect->pWindow) { return TSDB_CODE_SUCCESS; } - switch (nodeType(pSelect->pWindow)) { case QUERY_NODE_STATE_WINDOW: return createWindowLogicNodeByState(pCxt, (SStateWindowNode*)pSelect->pWindow, pSelect, pLogicNode); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 7b2cd71677..3e85329603 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2042,6 +2042,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* } int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild); + TSWAP(pProjectNode->node.pHint, pChild->pHint); if (TSDB_CODE_SUCCESS == code) { NODES_CLEAR_LIST(pProjectNode->node.pChildren); nodesDestroyNode((SNode*)pProjectNode); @@ -3587,6 +3588,67 @@ static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicS return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan); } +static bool partColOptShouldBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { + SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode; + if (pPartition->node.pParent && nodeType(pPartition->node.pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) return false; + if (planOptNodeListHasCol(pPartition->pPartitionKeys)) return true; + } + return false; +} + +static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { + SNode* node; + int32_t code = TSDB_CODE_SUCCESS; + SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT); + if (pSort) { + pSort->groupSort = false; + TSWAP(pSort->node.pChildren, pPartition->node.pChildren); + FOREACH(node, pPartition->pPartitionKeys) { + SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); + if (!pOrder) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder); + pOrder->order = ORDER_ASC; + pOrder->pExpr = nodesCloneNode(node); + if (!pOrder->pExpr) code = TSDB_CODE_OUT_OF_MEMORY; + } + } + } + if (code == TSDB_CODE_SUCCESS) { + pSort->node.pTargets = nodesCloneList(((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0))->pTargets); + if (!pSort->node.pTargets) code = TSDB_CODE_OUT_OF_MEMORY; + } + if (code != TSDB_CODE_SUCCESS) { + nodesDestroyNode((SNode*)pSort); + pSort = NULL; + } + return pSort; +} + +static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + int32_t code = TSDB_CODE_SUCCESS; + SPartitionLogicNode* pNode = (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized); + if (NULL == pNode) return TSDB_CODE_SUCCESS; + + SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pNode); + if (!pRootNode->pHint || !getSortForGroupOptHint(pRootNode->pHint)) { + return code; + } + + // replace with sort node + SSortLogicNode* pSort = partColOptCreateSort(pNode); + if (!pSort) code = TSDB_CODE_OUT_OF_MEMORY; + if (code == TSDB_CODE_SUCCESS) { + pSort->calcGroupId = true; + code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort); + } + if (code == TSDB_CODE_SUCCESS) { + pCxt->optimized = true; + } + return code; +} // clang-format off static const SOptimizeRule optimizeRuleSet[] = { @@ -3606,6 +3668,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, + {.pName = "PartitionCols", .optimizeFunc = partitionColsOpt}, }; // clang-format on diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 5628f5cf0f..651c2b4db1 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1749,6 +1749,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren SNodeList* pPrecalcExprs = NULL; SNodeList* pSortKeys = NULL; int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys); + pSort->calcGroupId = pSortLogicNode->calcGroupId; SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); // push down expression to pOutputDataBlockDesc of child node diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 8b7f20f5cf..a77a8e72be 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -244,7 +244,12 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { } pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0); } - return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)); + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)) { + return true; + } else if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { + return stbSplHasMultiTbScan(streamQuery, (SLogicNode*)pChild); + } + return false; } static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) { diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index dcdc402c8b..5296ccf710 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -430,6 +430,17 @@ bool getBatchScanOptionFromHint(SNodeList* pList) { return batchScan; } +bool getSortForGroupOptHint(SNodeList* pList) { + SNode* pNode; + FOREACH(pNode, pList) { + SHintNode* pHint = (SHintNode*)pNode; + if (pHint->option == HINT_SORT_FOR_GROUP) { + return true; + } + } + return false; +} + int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SLogicNode* pCurr = (SLogicNode*)pNode; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 24f6ad33b6..279836ca18 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -34,6 +34,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py diff --git a/tests/system-test/2-query/partition_by_col.py b/tests/system-test/2-query/partition_by_col.py new file mode 100644 index 0000000000..66f014a4f5 --- /dev/null +++ b/tests/system-test/2-query/partition_by_col.py @@ -0,0 +1,227 @@ +import taos +import sys +import time +import socket +import os +import threading +import math +from datetime import datetime + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +# from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.vgroups = 4 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + self.duraion = '1h' + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration)) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, paraDict): + colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"]) + tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"]) + sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString) + tdLog.debug("%s"%(sqlString)) + tsql.execute(sqlString) + return + + def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0): + for i in range(ctbNum): + sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \ + (dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx) + tsql.execute(sqlString) + + tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + for i in range(ctbNum): + rowsBatched = 0 + sql += " %s%d values "%(ctbPrefix,i) + for j in range(rowsPerTbl): + if (i < ctbNum/2): + sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10) + else: + sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10) + rowsBatched += 1 + if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + rowsBatched = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s%d values " %(ctbPrefix,i) + else: + sql = "insert into " + if sql != pre_insert: + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'test', + 'dropFlag': 1, + 'vgroups': 2, + 'stbName': 'meters', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}], + 'ctbPrefix': 't', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 10000, + 'batchNum': 3000, + 'startTs': 1537146000000, + 'tsStep': 600000} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("create database") + self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion) + + tdLog.info("create stb") + self.create_stable(tsql=tdSql, paraDict=paraDict) + + tdLog.info("create child tables") + self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \ + stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\ + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"]) + self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\ + ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\ + rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\ + startTs=paraDict["startTs"],tsStep=paraDict["tsStep"]) + return + + def check_explain_res_has_row(self, plan_str_expect: str, rows): + plan_found = False + for row in rows: + if str(row).find(plan_str_expect) >= 0: + tdLog.debug("plan: [%s] found in: [%s]" % (plan_str_expect, str(row))) + plan_found = True + break + if not plan_found: + tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(rows))) + + + def test_sort_for_partition_hint(self): + sql = 'explain select count(*), c1 from meters partition by c1' + sql_hint = 'explain select /*+ sort_for_group() */count(*), c1 from meters partition by c1' + tdSql.query(sql) + self.check_explain_res_has_row("Partition on", tdSql.queryResult) + tdSql.query(sql_hint) + self.check_explain_res_has_row("Sort", tdSql.queryResult) + + sql = 'explain select count(*), c1, tbname from meters partition by tbname, c1' + sql_hint = 'explain select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1' + tdSql.query(sql) + self.check_explain_res_has_row("Partition on", tdSql.queryResult) + tdSql.query(sql_hint) + self.check_explain_res_has_row("Sort", tdSql.queryResult) + + sql_interval = 'explain select count(*), c1 from meters partition by c1 interval(1s)' + sql_interval_hint = 'explain select /*+ sort_for_group() */ count(*), c1 from meters partition by c1 interval(1s)' + tdSql.query(sql_interval) + self.check_explain_res_has_row("Partition on", tdSql.queryResult) + tdSql.query(sql_interval_hint) + self.check_explain_res_has_row("Partition on", tdSql.queryResult) + + def add_order_by(self, sql: str, order_by: str, select_list: str = "*") -> str: + return "select %s from (%s)t order by %s" % (select_list, sql, order_by) + + def add_hint(self, sql: str) -> str: + return "select /*+ sort_for_group() */ %s" % sql[6:] + + def query_with_time(self, sql): + start = datetime.now() + tdSql.query(sql) + return (datetime.now().timestamp() - start.timestamp()) * 1000 + + def query_and_compare_res(self, sql1, sql2): + dur = self.query_with_time(sql1) + tdLog.debug("sql1 query with time: [%f]" % dur) + res1 = tdSql.queryResult + dur = self.query_with_time(sql2) + tdLog.debug("sql2 query with time: [%f]" % dur) + res2 = tdSql.queryResult + if res1 is None or res2 is None: + tdLog.exit("res1 or res2 is None") + if len(res1) != len(res2): + tdLog.exit("query and copare failed cause different rows, sql1: [%s], rows: [%d], sql2: [%s], rows: [%d]" % (sql1, len(res1), sql2, len(res2))) + for i in range(0, len(res1)): + if res1[i] != res2[i]: + tdLog.exit("compare failed for row: [%d], sqls: [%s] res1: [%s], sql2 : [%s] res2: [%s]" % (i, sql1, res1[i], sql2, res2[i])) + tdLog.debug("sql: [%s] and sql: [%s] have same results, rows: [%d]" % (sql1, sql2, len(res1))) + + def prepare_and_query(self, sqls: [], order_by: str, select_list: str = "*"): + for sql in sqls: + sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list) + sql = self.add_order_by(sql, order_by, select_list) + self.query_and_compare_res(sql, sql_hint) + pass + + def test_sort_for_partition_res(self): + sqls_par_c1_agg = [ + "select count(*), c1 from meters partition by c1", + "select count(*), min(c2), max(c3), c1 from meters partition by c1", + ] + sqls_par_c1 = [ + "select * from meters partition by c1" + ] + sqls_par_c1_c2_agg = [ + "select count(*), c1, c2 from meters partition by c1, c2", + "select count(*), c1, c2, min(c4), max(c5), sum(c6) from meters partition by c1, c2", + ] + sqls_par_c1_c2 = [ + "select * from meters partition by c1, c2" + ] + + sqls_par_tbname_c1 = [ + "select count(*), c1 , tbname as tb from meters partition by tbname, c1" + ] + sqls_par_tag_c1 = [ + "select count(*), c1, t1 from meters partition by t1, c1" + ] + self.prepare_and_query(sqls_par_c1_agg, "c1") + self.prepare_and_query(sqls_par_c1, "c1, ts, c2", "c1, ts, c2") + self.prepare_and_query(sqls_par_c1_c2_agg, "c1, c2") + self.prepare_and_query(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3") + self.prepare_and_query(sqls_par_tbname_c1, "tb, c1") + self.prepare_and_query(sqls_par_tag_c1, "t1, c1") + + def run(self): + self.prepareTestEnv() + self.test_sort_for_partition_hint() + self.test_sort_for_partition_res() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index adea684ef0..443c27fd7e 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -18,6 +18,7 @@ python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4 python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4 +python3 ./test.py -f 2-query/partition_by_col.py -Q 4 python3 ./test.py -f 7-tmq/tmqShow.py python3 ./test.py -f 7-tmq/tmqDropStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py