diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 244894b59b..b2aff6fd7e 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -28,6 +28,7 @@ typedef struct SBlockOrderInfo { bool nullFirst; int32_t order; int32_t slotId; + void* compFn; SColumnInfoData* pColData; } SBlockOrderInfo; @@ -82,6 +83,15 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, } } +static FORCE_INLINE bool colDataIsNull_t(const SColumnInfoData* pColumnInfoData, uint32_t row, bool isVarType) { + if (!pColumnInfoData->hasNull) return false; + if (isVarType) { + return colDataIsNull_var(pColumnInfoData, row); + } else { + return pColumnInfoData->nullbitmap ? colDataIsNull_f(pColumnInfoData->nullbitmap, row) : false; + } +} + static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) { if (!pColumnInfoData->hasNull) { @@ -210,6 +220,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(uint32_t numOfCols); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); +/** + * @brief find how many rows already in order start from first row + */ +int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 944880511b..f1c22f750f 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -366,6 +366,7 @@ #define TK_NK_BIN 605 // bin format data 0b111 #define TK_BATCH_SCAN 606 #define TK_NO_BATCH_SCAN 607 +#define TK_SORT_FOR_GROUP 608 #define TK_NK_NIL 65535 diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 9d2247f479..885e9e5a30 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -136,6 +136,8 @@ typedef struct SAggLogicNode { bool hasTimeLineFunc; bool onlyHasKeepOrderFunc; bool hasGroupKeyOptimized; + bool isGroupTb; + bool isPartTb; // true if partition keys has tbname } SAggLogicNode; typedef struct SProjectLogicNode { @@ -221,6 +223,7 @@ typedef struct SMergeLogicNode { int32_t srcGroupId; bool groupSort; bool ignoreGroupId; + bool inputWithGroupId; } SMergeLogicNode; typedef enum EWindowType { @@ -263,6 +266,7 @@ typedef struct SWindowLogicNode { int8_t igExpired; int8_t igCheckUpdate; EWindowAlgorithm windowAlgo; + bool isPartTb; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -279,8 +283,9 @@ typedef struct SSortLogicNode { SLogicNode node; SNodeList* pSortKeys; bool groupSort; - int64_t maxRows; bool skipPKSortOpt; + bool calcGroupId; + bool excludePkCol; // exclude PK ts col when calc group id } SSortLogicNode; typedef struct SPartitionLogicNode { @@ -288,6 +293,10 @@ typedef struct SPartitionLogicNode { SNodeList* pPartitionKeys; SNodeList* pTags; SNode* pSubtable; + + bool needBlockOutputTsOrder; // if true, partition output block will have ts order maintained + int32_t pkTsColId; + uint64_t pkTsColTbId; } SPartitionLogicNode; typedef enum ESubplanType { @@ -527,6 +536,7 @@ typedef struct SMergePhysiNode { int32_t srcGroupId; bool groupSort; bool ignoreGroupId; + bool inputWithGroupId; } SMergePhysiNode; typedef struct SWindowPhysiNode { @@ -603,6 +613,8 @@ typedef struct SSortPhysiNode { SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode SNodeList* pTargets; + bool calcGroupId; + bool excludePkCol; } SSortPhysiNode; typedef SSortPhysiNode SGroupSortPhysiNode; @@ -612,6 +624,9 @@ typedef struct SPartitionPhysiNode { SNodeList* pExprs; // these are expression list of partition_by_clause SNodeList* pPartitionKeys; SNodeList* pTargets; + + bool needBlockOutputTsOrder; + int32_t tsSlotId; } SPartitionPhysiNode; typedef struct SStreamPartitionPhysiNode { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index d7d45c57ad..bb6713dc83 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -119,6 +119,7 @@ typedef struct SLeftValueNode { typedef enum EHintOption { HINT_NO_BATCH_SCAN = 1, HINT_BATCH_SCAN, + HINT_SORT_FOR_GROUP, } EHintOption; typedef struct SHintNode { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 53646b84b3..330eb4ae30 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -933,7 +933,13 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { return 0; } } - __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order); + + __compar_fn_t fn; + if (pOrder->compFn) { + fn = pOrder->compFn; + } else { + fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order); + } int ret = fn(left1, right1); if (ret == 0) { @@ -1099,6 +1105,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); + pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order); } terrno = 0; @@ -2509,3 +2516,20 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); } + +int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) { + if (!pDataBlock || !pOrderInfo) return 0; + for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { + SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i); + pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId); + pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order); + } + SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock}; + int32_t rowIdx = 0, nextRowIdx = 1; + for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) { + if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) { + break; + } + } + return nextRowIdx; +} diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 5e2ca462f7..740ff7b0dc 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -196,4 +196,22 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, SStorageAPI* pStorageAPI); + +/** + * @brief build a tuple into keyBuf + * @param [out] keyBuf the output buf + * @param [in] pSortGroupCols the cols to build + * @param [in] pBlock block the tuple in + */ +int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex); + +int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pDataBlock, + int32_t rowIndex); + +uint64_t calcGroupId(char *pData, int32_t len); + +SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys); + +int32_t extractKeysLen(const SArray* keys); + #endif // TDENGINE_EXECUTIL_H diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 3180173ca7..365acf2bff 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -194,6 +194,16 @@ void tsortSetClosed(SSortHandle* pHandle); void tsortSetSingleTableMerge(SSortHandle* pHandle); void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param); +/** + * @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen + * @param [in] pSortCols cols to comp and build + * @param [in, out] pass in the old keys, if comp not equal, new keys will be built in it. + * @param [in, out] keyLen the old keysLen, if comp not equal, new keysLen will be stored in it. + * @param [in] the tuple to comp with + * @retval 0 if comp equal, 1 if not + */ +int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index f6a8c6689f..5e649af47e 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -303,6 +303,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || + downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT || (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) { return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 52631cd0db..753d3e680c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2261,3 +2261,104 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t ts[3] = pWin->skey; // window start key ts[4] = pWin->ekey + delta; // window end key } + +int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) { + SColumnDataAgg* pColAgg = NULL; + const char* isNull = oldkeyBuf; + const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size; + + for (int32_t i = 0; i < pSortGroupCols->size; ++i) { + const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); + const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); + if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; + + if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { + if (isNull[i] != 1) return 1; + } else { + if (isNull[i] != 0) return 1; + const char* val = colDataGetData(pColInfoData, rowIndex); + if (pCol->type == TSDB_DATA_TYPE_JSON) { + int32_t len = getJsonValueLen(val); + if (memcmp(p, val, len) != 0) return 1; + p += len; + } else if (IS_VAR_DATA_TYPE(pCol->type)) { + if (memcmp(p, val, varDataTLen(val)) != 0) return 1; + p += varDataTLen(val); + } else { + if (0 != memcmp(p, val, pCol->bytes)) return 1; + p += pCol->bytes; + } + } + } + if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1; + return 0; +} + +int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, + int32_t rowIndex) { + uint32_t colNum = pSortGroupCols->size; + SColumnDataAgg* pColAgg = NULL; + char* isNull = keyBuf; + char* p = keyBuf + sizeof(int8_t) * colNum; + + for (int32_t i = 0; i < colNum; ++i) { + const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); + const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); + if (pCol->slotId > pBlock->pDataBlock->size) continue; + + if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; + + if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { + isNull[i] = 1; + } else { + isNull[i] = 0; + const char* val = colDataGetData(pColInfoData, rowIndex); + if (pCol->type == TSDB_DATA_TYPE_JSON) { + int32_t len = getJsonValueLen(val); + memcpy(p, val, len); + p += len; + } else if (IS_VAR_DATA_TYPE(pCol->type)) { + varDataCopy(p, val); + p += varDataTLen(val); + } else { + memcpy(p, val, pCol->bytes); + p += pCol->bytes; + } + } + } + return (int32_t)(p - keyBuf); +} + +uint64_t calcGroupId(char* pData, int32_t len) { + T_MD5_CTX context; + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pData, len); + tMD5Final(&context); + + // NOTE: only extract the initial 8 bytes of the final MD5 digest + uint64_t id = 0; + memcpy(&id, context.digest, sizeof(uint64_t)); + if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t)); + return id; +} + +SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) { + SNode* node; + SNodeList* ret = NULL; + FOREACH(node, pSortKeys) { + SOrderByExprNode* pSortKey = (SOrderByExprNode*)node; + nodesListMakeAppend(&ret, pSortKey->pExpr); + } + return ret; +} + +int32_t extractKeysLen(const SArray* keys) { + int32_t len = 0; + int32_t keyNum = taosArrayGetSize(keys); + for (int32_t i = 0; i < keyNum; ++i) { + SColumn* pCol = (SColumn*)taosArrayGet(keys, i); + len += pCol->bytes; + } + len += sizeof(int8_t) * keyNum; //null flag + return len; +} diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index fb2204eae8..1060dd4f0e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -56,6 +56,10 @@ typedef struct SPartitionOperatorInfo { int32_t groupIndex; // group index int32_t pageIndex; // page index of current group SExprSupp scalarSup; + + int32_t remainRows; + int32_t orderedRows; + SArray* pOrderInfoArr; } SPartitionOperatorInfo; static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); @@ -635,20 +639,6 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf return pPage; } -uint64_t calcGroupId(char* pData, int32_t len) { - T_MD5_CTX context; - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pData, len); - tMD5Final(&context); - - // NOTE: only extract the initial 8 bytes of the final MD5 digest - uint64_t id = 0; - memcpy(&id, context.digest, sizeof(uint64_t)); - if (0 == id) - memcpy(&id, context.digest + 8, sizeof(uint64_t)); - return id; -} - int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t)); @@ -699,37 +689,49 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SPartitionOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SDataGroupInfo* pGroupInfo = - (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; - if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { - // try next group data - ++pInfo->groupIndex; - if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { - setOperatorCompleted(pOperator); - clearPartitionOperator(pInfo); - return NULL; + if (pInfo->remainRows == 0) { + blockDataCleanup(pInfo->binfo.pRes); + SDataGroupInfo* pGroupInfo = + (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; + if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { + // try next group data + ++pInfo->groupIndex; + if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { + setOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); + return NULL; + } + + pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex); + pInfo->pageIndex = 0; } - pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex); - pInfo->pageIndex = 0; + int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); + void* page = getBufPage(pInfo->pBuf, *pageId); + if (page == NULL) { + qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, terrno); + } + + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity); + blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); + + pInfo->pageIndex += 1; + releaseBufPage(pInfo->pBuf, page); + pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; + pInfo->binfo.pRes->info.dataLoad = 1; + pInfo->orderedRows = 0; } - int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); - void* page = getBufPage(pInfo->pBuf, *pageId); - if (page == NULL) { - qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, terrno); + if (pInfo->pOrderInfoArr) { + pInfo->binfo.pRes->info.rows += pInfo->remainRows; + blockDataTrimFirstRows(pInfo->binfo.pRes, pInfo->orderedRows); + pInfo->orderedRows = blockDataGetSortedRows(pInfo->binfo.pRes, pInfo->pOrderInfoArr); + pInfo->remainRows = pInfo->binfo.pRes->info.rows - pInfo->orderedRows; + pInfo->binfo.pRes->info.rows = pInfo->orderedRows; } - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity); - blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); - - pInfo->pageIndex += 1; - releaseBufPage(pInfo->pBuf, page); - - pInfo->binfo.pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); - pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows; return pInfo->binfo.pRes; @@ -746,7 +748,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - blockDataCleanup(pRes); return buildPartitionResult(pOperator); } @@ -829,6 +830,7 @@ static void destroyPartitionOperatorInfo(void* param) { cleanupExprSupp(&pInfo->scalarSup); destroyDiskbasedBuf(pInfo->pBuf); + taosArrayDestroy(pInfo->pOrderInfoArr); taosMemoryFreeClear(param); } @@ -846,6 +848,17 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); + if (pPartNode->needBlockOutputTsOrder) { + SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId}; + pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo)); + if (!pInfo->pOrderInfoArr) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = terrno; + goto _error; + } + taosArrayPush(pInfo->pOrderInfoArr, &order); + } + if (pPartNode->pExprs != NULL) { int32_t num = 0; SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index a59a7bb1ea..ccef6640be 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -19,18 +19,28 @@ #include "querytask.h" #include "tdatablock.h" +typedef struct SSortOpGroupIdCalc { + STupleHandle* pSavedTuple; + SArray* pSortColsArr; + char* keyBuf; + int32_t lastKeysLen; // default to be 0 + uint64_t lastGroupId; + bool excludePKCol; +} SSortOpGroupIdCalc; + typedef struct SSortOperatorInfo { - SOptrBasicInfo binfo; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - SColMatchInfo matchInfo; - int32_t bufPageSize; - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - SLimitInfo limitInfo; - uint64_t maxTupleLength; - int64_t maxRows; + SOptrBasicInfo binfo; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + SColMatchInfo matchInfo; + int32_t bufPageSize; + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + SLimitInfo limitInfo; + uint64_t maxTupleLength; + int64_t maxRows; + SSortOpGroupIdCalc* pGroupIdCalc; } SSortOperatorInfo; static SSDataBlock* doSort(SOperatorInfo* pOperator); @@ -40,6 +50,8 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain static void destroySortOperatorInfo(void* param); static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys); +static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc); + // todo add limit/offset impl SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); @@ -78,6 +90,34 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); + + if (pSortNode->calcGroupId) { + int32_t keyLen; + SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc)); + if (!pGroupIdCalc) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys); + if (!pSortColsNodeArr) code = TSDB_CODE_OUT_OF_MEMORY; + if (TSDB_CODE_SUCCESS == code) { + pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr); + if (!pGroupIdCalc->pSortColsArr) code = TSDB_CODE_OUT_OF_MEMORY; + nodesClearList(pSortColsNodeArr); + } + if (TSDB_CODE_SUCCESS == code) { + // PK ts col should always at last, see partColOptCreateSort + if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr); + keyLen = extractKeysLen(pGroupIdCalc->pSortColsArr); + } + if (TSDB_CODE_SUCCESS == code) { + pGroupIdCalc->lastKeysLen = 0; + pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen); + if (!pGroupIdCalc->keyBuf) code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (code != TSDB_CODE_SUCCESS) goto _error; + pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder; initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); @@ -129,6 +169,52 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { pBlock->info.rows += 1; } +/** + * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys + * @param [in, out] pBlock the output block, the group id will be saved in it + * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple + * @retval NULL if no more tuples + */ +static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) { + STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple; + if (!retTuple) { + retTuple = tsortNextTuple(pHandle); + } + + if (retTuple) { + int32_t newGroup; + if (pInfo->pGroupIdCalc->pSavedTuple) { + newGroup = true; + pInfo->pGroupIdCalc->pSavedTuple = NULL; + } else { + newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf, + &pInfo->pGroupIdCalc->lastKeysLen, retTuple); + } + bool emptyBlock = pBlock->info.rows == 0; + if (newGroup) { + if (!emptyBlock) { + // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return + // NULL. Note that the keyBuf and lastKeysLen has been updated to new value + pInfo->pGroupIdCalc->pSavedTuple = retTuple; + retTuple = NULL; + } else { + // new group with empty block + pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId = + calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen); + } + } else { + if (emptyBlock) { + // new block but not new group, assign last group id to it + pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId; + } else { + // not new group and not empty block and ret NOT NULL, just return the tuple + } + } + } + + return retTuple; +} + SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo) { blockDataCleanup(pDataBlock); @@ -140,8 +226,13 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i blockDataEnsureCapacity(p, capacity); + STupleHandle* pTupleHandle; while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + if (pInfo->pGroupIdCalc) { + pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p); + } else { + pTupleHandle = tsortNextTuple(pHandle); + } if (pTupleHandle == NULL) { break; } @@ -168,6 +259,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i pDataBlock->info.dataLoad = 1; pDataBlock->info.rows = p->info.rows; pDataBlock->info.scanFlag = p->info.scanFlag; + pDataBlock->info.id.groupId = p->info.id.groupId; } blockDataDestroy(p); @@ -281,6 +373,7 @@ void destroySortOperatorInfo(void* param) { tsortDestroySortHandle(pInfo->pSortHandle); taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->matchInfo.pList); + destroySortOpGroupIdCalc(pInfo->pGroupIdCalc); taosMemoryFreeClear(param); } @@ -309,6 +402,14 @@ static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNod return TSDB_CODE_SUCCESS; } +static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) { + if (pCalc) { + taosArrayDestroy(pCalc->pSortColsArr); + taosMemoryFree(pCalc->keyBuf); + taosMemoryFree(pCalc); + } +} + //===================================================================================== // Group Sort Operator typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus; @@ -591,6 +692,7 @@ typedef struct SMultiwayMergeOperatorInfo { bool ignoreGroupId; uint64_t groupId; STupleHandle* prefetchedTuple; + bool inputWithGroupId; } SMultiwayMergeOperatorInfo; int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { @@ -641,7 +743,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* while (1) { STupleHandle* pTupleHandle = NULL; - if (pInfo->groupSort) { + if (pInfo->groupSort || pInfo->inputWithGroupId) { if (pInfo->prefetchedTuple == NULL) { pTupleHandle = tsortNextTuple(pHandle); } else { @@ -662,7 +764,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* break; } - if (pInfo->groupSort) { + if (pInfo->groupSort || pInfo->inputWithGroupId) { uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) { appendOneRowToDataBlock(p, pTupleHandle); @@ -842,6 +944,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder; + pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId; setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 287c824540..1ebc7ad3c6 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -25,6 +25,7 @@ #include "tsort.h" #include "tutil.h" #include "tsimplehash.h" +#include "executil.h" struct STupleHandle { SSDataBlock* pBlock; @@ -615,48 +616,62 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) { int ret = pParam->cmpFn(left1, right1); return ret; } else { + bool isVarType; for (int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); + isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type); - bool leftNull = false; - if (pLeftColInfoData->hasNull) { - if (pLeftBlock->pBlockAgg == NULL) { - leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex); - } else { - leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, - pLeftBlock->pBlockAgg[i]); + if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) { + bool leftNull = false; + if (pLeftColInfoData->hasNull) { + if (pLeftBlock->pBlockAgg == NULL) { + leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType); + } else { + leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, + pLeftBlock->pBlockAgg[i]); + } + } + + bool rightNull = false; + if (pRightColInfoData->hasNull) { + if (pRightBlock->pBlockAgg == NULL) { + rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType); + } else { + rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, + pRightBlock->pBlockAgg[i]); + } + } + + if (leftNull && rightNull) { + continue; // continue to next slot + } + + if (rightNull) { + return pOrder->nullFirst ? 1 : -1; + } + + if (leftNull) { + return pOrder->nullFirst ? -1 : 1; } } - bool rightNull = false; - if (pRightColInfoData->hasNull) { - if (pRightBlock->pBlockAgg == NULL) { - rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex); - } else { - rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, - pRightBlock->pBlockAgg[i]); - } + void* left1, *right1; + if (isVarType) { + left1 = colDataGetVarData(pLeftColInfoData, pLeftSource->src.rowIndex); + right1 = colDataGetVarData(pRightColInfoData, pRightSource->src.rowIndex); + } else { + left1 = colDataGetNumData(pLeftColInfoData, pLeftSource->src.rowIndex); + right1 = colDataGetNumData(pRightColInfoData, pRightSource->src.rowIndex); } - if (leftNull && rightNull) { - continue; // continue to next slot + __compar_fn_t fn = pOrder->compFn; + if (!fn) { + fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order); + pOrder->compFn = fn; } - if (rightNull) { - return pOrder->nullFirst ? 1 : -1; - } - - if (leftNull) { - return pOrder->nullFirst ? -1 : 1; - } - - void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex); - void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex); - - __compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order); - int ret = fn(left1, right1); if (ret == 0) { continue; @@ -1567,3 +1582,15 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { return info; } + +int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, + const STupleHandle* pTuple) { + int32_t ret; + if (0 == compKeys(pSortCols, keyBuf, *keyLen, pTuple->pBlock, pTuple->rowIndex)) { + ret = 0; + } else { + *keyLen = buildKeys(keyBuf, pSortCols, pTuple->pBlock, pTuple->rowIndex); + ret = 1; + } + return ret; +} diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index c62190b68a..3d48036095 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -444,6 +444,7 @@ static int32_t logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) { CLONE_NODE_LIST_FIELD(pGroupKeys); CLONE_NODE_LIST_FIELD(pAggFuncs); COPY_SCALAR_FIELD(hasGroupKeyOptimized); + COPY_SCALAR_FIELD(isPartTb); return TSDB_CODE_SUCCESS; } @@ -489,6 +490,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst COPY_SCALAR_FIELD(srcGroupId); COPY_SCALAR_FIELD(groupSort); COPY_SCALAR_FIELD(ignoreGroupId); + COPY_SCALAR_FIELD(inputWithGroupId); return TSDB_CODE_SUCCESS; } @@ -531,6 +533,8 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pSortKeys); COPY_SCALAR_FIELD(groupSort); + COPY_SCALAR_FIELD(calcGroupId); + COPY_SCALAR_FIELD(excludePkCol); return TSDB_CODE_SUCCESS; } @@ -539,6 +543,9 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog CLONE_NODE_LIST_FIELD(pPartitionKeys); CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_FIELD(pSubtable); + COPY_SCALAR_FIELD(needBlockOutputTsOrder); + COPY_SCALAR_FIELD(pkTsColId); + COPY_SCALAR_FIELD(pkTsColTbId); return TSDB_CODE_SUCCESS; } @@ -678,6 +685,8 @@ static int32_t physiPartitionCopy(const SPartitionPhysiNode* pSrc, SPartitionPhy CLONE_NODE_LIST_FIELD(pExprs); CLONE_NODE_LIST_FIELD(pPartitionKeys); CLONE_NODE_LIST_FIELD(pTargets); + COPY_SCALAR_FIELD(needBlockOutputTsOrder); + COPY_SCALAR_FIELD(tsSlotId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c72b03817b..c2acf0dbdf 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2271,6 +2271,7 @@ static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels"; static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId"; static const char* jkMergePhysiPlanGroupSort = "GroupSort"; static const char* jkMergePhysiPlanIgnoreGroupID = "IgnoreGroupID"; +static const char* jkMergePhysiPlanInputWithGroupId = "InputWithGroupId"; static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj; @@ -2294,6 +2295,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanIgnoreGroupID, pNode->ignoreGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanInputWithGroupId, pNode->inputWithGroupId); + } return code; } @@ -2327,7 +2331,8 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { static const char* jkSortPhysiPlanExprs = "Exprs"; static const char* jkSortPhysiPlanSortKeys = "SortKeys"; static const char* jkSortPhysiPlanTargets = "Targets"; -static const char* jkSortPhysiPlanMaxRows = "MaxRows"; +static const char* jkSortPhysiPlanCalcGroupIds = "CalcGroupIds"; +static const char* jkSortPhysiPlanExcludePKCol = "ExcludePKCol"; static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2342,6 +2347,12 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanCalcGroupIds, pNode->calcGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanExcludePKCol, pNode->excludePkCol); + } return code; } @@ -2359,6 +2370,12 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkSortPhysiPlanCalcGroupIds, &pNode->calcGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code= tjsonGetBoolValue(pJson, jkSortPhysiPlanExcludePKCol, &pNode->excludePkCol); + } return code; } @@ -2644,6 +2661,8 @@ static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) { static const char* jkPartitionPhysiPlanExprs = "Exprs"; static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys"; static const char* jkPartitionPhysiPlanTargets = "Targets"; +static const char* jkPartitionPhysiPlanNeedBlockOutputTsOrder = "NeedBlockOutputTsOrder"; +static const char* jkPartitionPhysiPlanTsSlotId = "tsSlotId"; static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) { const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj; @@ -2658,6 +2677,12 @@ static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkPartitionPhysiPlanTargets, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + tjsonAddBoolToObject(pJson, jkPartitionPhysiPlanNeedBlockOutputTsOrder, pNode->needBlockOutputTsOrder); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonAddIntegerToObject(pJson, jkPartitionPhysiPlanTsSlotId, pNode->tsSlotId); + } return code; } @@ -2675,6 +2700,12 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkPartitionPhysiPlanTargets, &pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkPartitionPhysiPlanNeedBlockOutputTsOrder, &pNode->needBlockOutputTsOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkPartitionPhysiPlanTsSlotId, &pNode->tsSlotId); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index bc037f05ec..99100b2a1d 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2682,6 +2682,7 @@ enum { PHY_MERGE_CODE_SRC_GROUP_ID, PHY_MERGE_CODE_GROUP_SORT, PHY_MERGE_CODE_IGNORE_GROUP_ID, + PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, }; static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2706,6 +2707,9 @@ static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, pNode->inputWithGroupId); + } return code; } @@ -2738,6 +2742,9 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { case PHY_MERGE_CODE_IGNORE_GROUP_ID: code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId); break; + case PHY_MERGE_CODE_INPUT_WITH_GROUP_ID: + code = tlvDecodeBool(pTlv, &pNode->inputWithGroupId); + break; default: break; } @@ -2746,7 +2753,14 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS }; +enum { + PHY_SORT_CODE_BASE_NODE = 1, + PHY_SORT_CODE_EXPR, + PHY_SORT_CODE_SORT_KEYS, + PHY_SORT_CODE_TARGETS, + PHY_SORT_CODE_CALC_GROUPID, + PHY_SORT_CODE_EXCLUDE_PK_COL +}; static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2761,6 +2775,12 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_CALC_GROUPID, pNode->calcGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_EXCLUDE_PK_COL, pNode->excludePkCol); + } return code; } @@ -2784,6 +2804,11 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; + case PHY_SORT_CODE_CALC_GROUPID: + code = tlvDecodeBool(pTlv, &pNode->calcGroupId); + break; + case PHY_SORT_CODE_EXCLUDE_PK_COL: + code = tlvDecodeBool(pTlv, &pNode->excludePkCol); default: break; } @@ -3142,7 +3167,14 @@ static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, PHY_PARTITION_CODE_KEYS, PHY_PARTITION_CODE_TARGETS }; +enum { + PHY_PARTITION_CODE_BASE_NODE = 1, + PHY_PARTITION_CODE_EXPR, + PHY_PARTITION_CODE_KEYS, + PHY_PARTITION_CODE_TARGETS, + PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER, + PHY_PARTITION_CODE_TS_SLOTID +}; static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj; @@ -3157,6 +3189,12 @@ static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_PARTITION_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER, pNode->needBlockOutputTsOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_PARTITION_CODE_TS_SLOTID, pNode->tsSlotId); + } return code; } @@ -3180,6 +3218,12 @@ static int32_t msgToPhysiPartitionNode(STlvDecoder* pDecoder, void* pObj) { case PHY_PARTITION_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; + case PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER: + code = tlvDecodeBool(pTlv, &pNode->needBlockOutputTsOrder); + break; + case PHY_PARTITION_CODE_TS_SLOTID: + code = tlvDecodeI32(pTlv, &pNode->tsSlotId); + break; default: break; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 2a3235b4f5..6845e16e9c 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -359,6 +359,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt } break; } + case HINT_SORT_FOR_GROUP: + if (paramNum > 0) return true; + break; default: return true; } @@ -421,6 +424,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) { } opt = HINT_NO_BATCH_SCAN; break; + case TK_SORT_FOR_GROUP: + lastComma = false; + if (0 != opt || inParamList) { + quit = true; + break; + } + opt = HINT_SORT_FOR_GROUP; + break; case TK_NK_LP: lastComma = false; if (0 == opt || inParamList) { diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index 5e230a9116..cbb3b1952b 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -207,6 +207,7 @@ static SKeyword keywordTable[] = { {"SMALLINT", TK_SMALLINT}, {"SNODE", TK_SNODE}, {"SNODES", TK_SNODES}, + {"SORT_FOR_GROUP", TK_SORT_FOR_GROUP}, {"SOFFSET", TK_SOFFSET}, {"SPLIT", TK_SPLIT}, {"STABLE", TK_STABLE}, diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index d8fb0def5f..83a4e9ced8 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -44,12 +44,15 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan); int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList); -bool getBatchScanOptionFromHint(SNodeList* pList); +bool getBatchScanOptionFromHint(SNodeList* pList); +bool getSortForGroupOptHint(SNodeList* pList); SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr); -int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes); -bool isPartTableAgg(SAggLogicNode* pAgg); -bool isPartTagAgg(SAggLogicNode* pAgg); -bool isPartTableWinodw(SWindowLogicNode* pWindow); +int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes); +bool isPartTableAgg(SAggLogicNode* pAgg); +bool isPartTagAgg(SAggLogicNode* pAgg); +bool isPartTableWinodw(SWindowLogicNode* pWindow); +bool keysHasCol(SNodeList* pKeys); +bool keysHasTbname(SNodeList* pKeys); #define CLONE_LIMIT 1 #define CLONE_SLIMIT 1 << 1 diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 96d253494d..45b2a73822 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -741,6 +741,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, } nodesDestroyList(pOutputGroupKeys); + pAgg->isGroupTb = pAgg->pGroupKeys ? keysHasTbname(pAgg->pGroupKeys) : 0; + pAgg->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0; + if (TSDB_CODE_SUCCESS == code) { *pLogicNode = (SLogicNode*)pAgg; } else { @@ -962,6 +965,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva nodesDestroyNode((SNode*)pWindow); return TSDB_CODE_OUT_OF_MEMORY; } + pWindow->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0; return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); } @@ -993,7 +997,6 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele if (NULL == pSelect->pWindow) { return TSDB_CODE_SUCCESS; } - switch (nodeType(pSelect->pWindow)) { case QUERY_NODE_STATE_WINDOW: return createWindowLogicNodeByState(pCxt, (SStateWindowNode*)pSelect->pWindow, pSelect, pLogicNode); @@ -1260,6 +1263,15 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS } } + if (keysHasCol(pPartition->pPartitionKeys) && pSelect->pWindow && + nodeType(pSelect->pWindow) == QUERY_NODE_INTERVAL_WINDOW) { + pPartition->needBlockOutputTsOrder = true; + SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSelect->pWindow; + SColumnNode* pTsCol = (SColumnNode*)pInterval->pCol; + pPartition->pkTsColId = pTsCol->colId; + pPartition->pkTsColTbId = pTsCol->tableId; + } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags) { pPartition->pTags = nodesCloneList(pSelect->pTags); if (NULL == pPartition->pTags) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 7b2cd71677..c81ca0b3d1 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1661,22 +1661,6 @@ static int32_t smaIndexOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub return smaIndexOptimizeImpl(pCxt, pLogicSubplan, pScan); } -static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) { - if (QUERY_NODE_COLUMN == nodeType(pNode)) { - if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) { - *(bool*)pContext = true; - return DEAL_RES_END; - } - } - return DEAL_RES_CONTINUE; -} - -static bool planOptNodeListHasCol(SNodeList* pKeys) { - bool hasCol = false; - nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol); - return hasCol; -} - static EDealRes partTagsOptHasTbname(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { if (COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) { @@ -1755,7 +1739,7 @@ static bool partTagsOptMayBeOptimized(SLogicNode* pNode) { return false; } - return !planOptNodeListHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode)); + return !keysHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode)); } static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) { @@ -2042,6 +2026,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* } int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild); + if (pProjectNode->node.pHint && !pChild->pHint) TSWAP(pProjectNode->node.pHint, pChild->pHint); if (TSDB_CODE_SUCCESS == code) { NODES_CLEAR_LIST(pProjectNode->node.pChildren); nodesDestroyNode((SNode*)pProjectNode); @@ -2734,7 +2719,7 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) { } SAggLogicNode* pAgg = (SAggLogicNode*)(pNode->pParent); - if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || planOptNodeListHasCol(pAgg->pGroupKeys) || + if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || keysHasCol(pAgg->pGroupKeys) || !planOptNodeListHasTbname(pAgg->pGroupKeys)) { return false; } @@ -2853,13 +2838,14 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT); return true; } + case QUERY_NODE_LOGIC_PLAN_SORT: + if (((SSortLogicNode*)pNodeLimitPushTo)->calcGroupId) break; + // fall through case QUERY_NODE_LOGIC_PLAN_FILL: - case QUERY_NODE_LOGIC_PLAN_SORT: { cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT); SNode* pChild = NULL; FOREACH(pChild, pNodeLimitPushTo->pChildren) { pushDownLimitHow(pNodeLimitPushTo, (SLogicNode*)pChild); } return true; - } case QUERY_NODE_LOGIC_PLAN_AGG: { if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && (isPartTagAgg((SAggLogicNode*)pNodeLimitPushTo) || isPartTableAgg((SAggLogicNode*)pNodeLimitPushTo))) { @@ -3587,6 +3573,96 @@ static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicS return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan); } +static bool partColOptShouldBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { + SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode; + if (keysHasCol(pPartition->pPartitionKeys)) return true; + } + return false; +} + +static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { + SNode* node; + int32_t code = TSDB_CODE_SUCCESS; + SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT); + if (pSort) { + bool alreadyPartByPKTs = false; + pSort->groupSort = false; + FOREACH(node, pPartition->pPartitionKeys) { + SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); + if (QUERY_NODE_COLUMN == nodeType(node) && ((SColumnNode*)node)->colId == pPartition->pkTsColId && + ((SColumnNode*)node)->tableId == pPartition->pkTsColTbId) + alreadyPartByPKTs = true; + if (!pOrder) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder); + pOrder->order = ORDER_ASC; + pOrder->pExpr = nodesCloneNode(node); + if (!pOrder->pExpr) code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (pPartition->needBlockOutputTsOrder && !alreadyPartByPKTs) { + SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); + if (!pOrder) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + pSort->excludePkCol = true; + nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder); + pOrder->order = ORDER_ASC; + pOrder->pExpr = 0; + FOREACH(node, pPartition->node.pTargets) { + if (nodeType(node) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)node; + if (pCol->colId == pPartition->pkTsColId && pCol->tableId == pPartition->pkTsColTbId) { + pOrder->pExpr = nodesCloneNode((SNode*)pCol); + break; + } + } + } + if (!pOrder->pExpr) { + code = TSDB_CODE_PAR_INTERNAL_ERROR; + } + } + } + } + if (code != TSDB_CODE_SUCCESS) { + nodesDestroyNode((SNode*)pSort); + pSort = NULL; + } + return pSort; +} + +static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + int32_t code = TSDB_CODE_SUCCESS; + SPartitionLogicNode* pNode = (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized); + if (NULL == pNode) return TSDB_CODE_SUCCESS; + + SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pNode); + if (!pRootNode->pHint || !getSortForGroupOptHint(pRootNode->pHint)) { + return code; + } + + // replace with sort node + SSortLogicNode* pSort = partColOptCreateSort(pNode); + if (!pSort) { + // if sort create failed, we eat the error, skip the optimization + code = TSDB_CODE_SUCCESS; + } else { + TSWAP(pSort->node.pChildren, pNode->node.pChildren); + TSWAP(pSort->node.pTargets, pNode->node.pTargets); + optResetParent((SLogicNode*)pSort); + pSort->calcGroupId = true; + code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort); + if (code == TSDB_CODE_SUCCESS) { + pCxt->optimized = true; + } else { + nodesDestroyNode((SNode*)pSort); + } + } + return code; +} // clang-format off static const SOptimizeRule optimizeRuleSet[] = { @@ -3606,6 +3682,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, + {.pName = "PartitionCols", .optimizeFunc = partitionColsOpt}, }; // clang-format on diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 5628f5cf0f..d55e80a23d 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1749,6 +1749,8 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren SNodeList* pPrecalcExprs = NULL; SNodeList* pSortKeys = NULL; int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys); + pSort->calcGroupId = pSortLogicNode->calcGroupId; + pSort->excludePkCol = pSortLogicNode->excludePkCol; SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); // push down expression to pOutputDataBlockDesc of child node @@ -1797,6 +1799,7 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* SNodeList* pPrecalcExprs = NULL; SNodeList* pPartitionKeys = NULL; int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys); + pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder; SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); // push down expression to pOutputDataBlockDesc of child node @@ -1818,6 +1821,22 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* } } + if (pPart->needBlockOutputTsOrder) { + SNode* node; + bool found = false; + FOREACH(node, pPartLogicNode->node.pTargets) { + if (nodeType(node) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)node; + if (pCol->tableId == pPartLogicNode->pkTsColTbId && pCol->colId == pPartLogicNode->pkTsColId) { + pPart->tsSlotId = pCol->slotId; + found = true; + break; + } + } + } + if (!found) code = TSDB_CODE_PLAN_INTERNAL_ERROR; + } + if (TSDB_CODE_SUCCESS == code) { code = setConditionsSlotId(pCxt, (const SLogicNode*)pPartLogicNode, (SPhysiNode*)pPart); } @@ -1944,6 +1963,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM pMerge->srcGroupId = pMergeLogicNode->srcGroupId; pMerge->groupSort = pMergeLogicNode->groupSort; pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId; + pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId; int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 8b7f20f5cf..881cc02062 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -244,7 +244,12 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { } pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0); } - return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)); + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)) { + return true; + } else if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { + return stbSplHasMultiTbScan(streamQuery, (SLogicNode*)pChild); + } + return false; } static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) { @@ -519,6 +524,11 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p } break; } + case QUERY_NODE_LOGIC_PLAN_SORT: { + SSortLogicNode* pSort = (SSortLogicNode*)pNode; + if (pSort->calcGroupId) pMerge->inputWithGroupId = true; + break; + } default: break; } diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index dcdc402c8b..1c7c937b7f 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -358,12 +358,12 @@ static bool stbNotSystemScan(SLogicNode* pNode) { } } -static bool stbHasPartTbname(SNodeList* pPartKeys) { - if (NULL == pPartKeys) { +bool keysHasTbname(SNodeList* pKeys) { + if (NULL == pKeys) { return false; } SNode* pPartKey = NULL; - FOREACH(pPartKey, pPartKeys) { + FOREACH(pPartKey, pKeys) { if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) { pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0); } @@ -390,10 +390,10 @@ bool isPartTableAgg(SAggLogicNode* pAgg) { return false; } if (NULL != pAgg->pGroupKeys) { - return stbHasPartTbname(pAgg->pGroupKeys) && + return (pAgg->isGroupTb || keysHasTbname(pAgg->pGroupKeys)) && stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)); } - return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); + return pAgg->isPartTb || keysHasTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); } static bool stbHasPartTag(SNodeList* pPartKeys) { @@ -430,6 +430,17 @@ bool getBatchScanOptionFromHint(SNodeList* pList) { return batchScan; } +bool getSortForGroupOptHint(SNodeList* pList) { + SNode* pNode; + FOREACH(pNode, pList) { + SHintNode* pHint = (SHintNode*)pNode; + if (pHint->option == HINT_SORT_FOR_GROUP) { + return true; + } + } + return false; +} + int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SLogicNode* pCurr = (SLogicNode*)pNode; @@ -467,7 +478,7 @@ bool isPartTagAgg(SAggLogicNode* pAgg) { } bool isPartTableWinodw(SWindowLogicNode* pWindow) { - return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); + return pWindow->isPartTb || keysHasTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); } bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) { @@ -490,3 +501,19 @@ bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) { } return cloned; } + +static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + } + return DEAL_RES_CONTINUE; +} + +bool keysHasCol(SNodeList* pKeys) { + bool hasCol = false; + nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol); + return hasCol; +} diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index ea2c9d468e..777de52762 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -49,6 +49,10 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py diff --git a/tests/system-test/2-query/partition_by_col.py b/tests/system-test/2-query/partition_by_col.py new file mode 100644 index 0000000000..1a394649d6 --- /dev/null +++ b/tests/system-test/2-query/partition_by_col.py @@ -0,0 +1,323 @@ +import taos +import sys +import time +import socket +import os +import threading +import math +from datetime import datetime + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +# from tmqCommon import * + +COMPARE_DATA = 0 +COMPARE_LEN = 1 + +class TDTestCase: + def __init__(self): + self.vgroups = 4 + self.ctbNum = 10 + self.rowsPerTbl = 10000 + self.duraion = '1h' + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration)) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, paraDict): + colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"]) + tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"]) + sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString) + tdLog.debug("%s"%(sqlString)) + tsql.execute(sqlString) + return + + def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0): + for i in range(ctbNum): + sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \ + (dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx) + tsql.execute(sqlString) + + tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + for i in range(ctbNum): + rowsBatched = 0 + sql += " %s%d values "%(ctbPrefix,i) + for j in range(rowsPerTbl): + if (i < ctbNum/2): + sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10) + else: + sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10) + rowsBatched += 1 + if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + rowsBatched = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s%d values " %(ctbPrefix,i) + else: + sql = "insert into " + if sql != pre_insert: + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'test', + 'dropFlag': 1, + 'vgroups': 2, + 'stbName': 'meters', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1}, + {'type': 'BIGINT', 'count':1}, + {'type': 'FLOAT', 'count':1}, + {'type': 'DOUBLE', 'count':1}, + {'type': 'smallint', 'count':1}, + {'type': 'tinyint', 'count':1}, + {'type': 'bool', 'count':1}, + {'type': 'binary', 'len':10, 'count':1}, + {'type': 'nchar', 'len':10, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}], + 'ctbPrefix': 't', + 'ctbStartIdx': 0, + 'ctbNum': 100, + 'rowsPerTbl': 10000, + 'batchNum': 3000, + 'startTs': 1537146000000, + 'tsStep': 600000} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("create database") + self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion) + + tdLog.info("create stb") + self.create_stable(tsql=tdSql, paraDict=paraDict) + + tdLog.info("create child tables") + self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \ + stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\ + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"]) + self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\ + ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\ + rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\ + startTs=paraDict["startTs"],tsStep=paraDict["tsStep"]) + return + + def check_explain_res_has_row(self, plan_str_expect: str, rows): + plan_found = False + for row in rows: + if str(row).find(plan_str_expect) >= 0: + tdLog.debug("plan: [%s] found in: [%s]" % (plan_str_expect, str(row))) + plan_found = True + break + if not plan_found: + tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(rows))) + + + def test_sort_for_partition_hint(self): + sql = 'select count(*), c1 from meters partition by c1' + sql_hint = 'select /*+ sort_for_group() */count(*), c1 from meters partition by c1' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + + sql = 'select count(*), c1, tbname from meters partition by tbname, c1' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + + sql = 'select count(*), c1, tbname from meters partition by tbname, c1 interval(1s)' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1 interval(1s)' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + + sql = 'select count(*), c1, t1 from meters partition by t1, c1' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + + sql = 'select count(*), c1, t1 from meters partition by t1, c1 interval(1s)' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1 interval(1s)' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + + sql = 'select count(*), c1 from meters partition by c1 interval(1s)' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1 from meters partition by c1 interval(1s)' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + + def add_order_by(self, sql: str, order_by: str, select_list: str = "*") -> str: + return "select %s from (%s)t order by %s" % (select_list, sql, order_by) + + def add_hint(self, sql: str) -> str: + return "select /*+ sort_for_group() */ %s" % sql[6:] + + def query_with_time(self, sql): + start = datetime.now() + tdSql.query(sql, queryTimes=1) + return (datetime.now().timestamp() - start.timestamp()) * 1000 + + def explain_sql(self, sql: str): + sql = "explain " + sql + tdSql.query(sql, queryTimes=1) + return tdSql.queryResult + + def query_and_compare_res(self, sql1, sql2, compare_what: int = 0): + dur = self.query_with_time(sql1) + tdLog.debug("sql1 query with time: [%f]" % dur) + res1 = tdSql.queryResult + dur = self.query_with_time(sql2) + tdLog.debug("sql2 query with time: [%f]" % dur) + res2 = tdSql.queryResult + if res1 is None or res2 is None: + tdLog.exit("res1 or res2 is None") + if compare_what <= COMPARE_LEN: + if len(res1) != len(res2): + tdLog.exit("query and copare failed cause different rows, sql1: [%s], rows: [%d], sql2: [%s], rows: [%d]" % (sql1, len(res1), sql2, len(res2))) + if compare_what == COMPARE_DATA: + for i in range(0, len(res1)): + if res1[i] != res2[i]: + tdLog.exit("compare failed for row: [%d], sqls: [%s] res1: [%s], sql2 : [%s] res2: [%s]" % (i, sql1, res1[i], sql2, res2[i])) + tdLog.debug("sql: [%s] and sql: [%s] have same results, rows: [%d]" % (sql1, sql2, len(res1))) + + def prepare_and_query_and_compare(self, sqls: [], order_by: str, select_list: str = "*", compare_what: int = 0): + for sql in sqls: + sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list) + sql = self.add_order_by(sql, order_by, select_list) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + self.check_explain_res_has_row("Partition", self.explain_sql(sql)) + self.query_and_compare_res(sql, sql_hint, compare_what=compare_what) + + def test_sort_for_partition_res(self): + sqls_par_c1_agg = [ + "select count(*), c1 from meters partition by c1", + "select count(*), min(c2), max(c3), c1 from meters partition by c1", + "select c1 from meters partition by c1", + ] + sqls_par_c1 = [ + "select * from meters partition by c1" + ] + sqls_par_c1_c2_agg = [ + "select count(*), c1, c2 from meters partition by c1, c2", + "select c1, c2 from meters partition by c1, c2", + "select count(*), c1, c2, min(c4), max(c5), sum(c6) from meters partition by c1, c2", + ] + sqls_par_c1_c2 = [ + "select * from meters partition by c1, c2" + ] + + sqls_par_tbname_c1 = [ + "select count(*), c1 , tbname as a from meters partition by tbname, c1" + ] + sqls_par_tag_c1 = [ + "select count(*), c1, t1 from meters partition by t1, c1" + ] + self.prepare_and_query_and_compare(sqls_par_c1_agg, "c1") + self.prepare_and_query_and_compare(sqls_par_c1, "c1, ts, c2", "c1, ts, c2") + self.prepare_and_query_and_compare(sqls_par_c1_c2_agg, "c1, c2") + self.prepare_and_query_and_compare(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3") + self.prepare_and_query_and_compare(sqls_par_tbname_c1, "a, c1") + self.prepare_and_query_and_compare(sqls_par_tag_c1, "t1, c1") + + def get_interval_template_sqls(self, col_name): + sqls = [ + 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1s)' % (col_name, col_name), + #'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30s)' % (col_name, col_name), + #'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1m)' % (col_name, col_name), + #'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30m)' % (col_name, col_name), + #'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1h)' % (col_name, col_name), + + 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1s)' % (col_name, col_name), + #'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30s)' % (col_name, col_name), + #'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1m)' % (col_name, col_name), + #'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name), + #'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name), + + 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1s)' % (col_name, col_name, col_name), + #'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name), + #'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name), + #'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name), + #'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name), + + 'select _wstart as ts, count(*), tbname as a, %s from meters partition by %s, tbname interval(1s)' % (col_name, col_name), + 'select _wstart as ts, count(*), t1 as a, %s from meters partition by %s, t1 interval(1s)' % (col_name, col_name), + ] + order_list = 'a, %s, ts' % (col_name) + return (sqls, order_list) + + def test_sort_for_partition_interval(self): + sqls, order_list = self.get_interval_template_sqls('c1') + self.prepare_and_query_and_compare(sqls, order_list) + #sqls, order_list = self.get_interval_template_sqls('c2') + #self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c3') + self.prepare_and_query_and_compare(sqls, order_list) + #sqls, order_list = self.get_interval_template_sqls('c4') + #self.prepare_and_query(sqls, order_list) + #sqls, order_list = self.get_interval_template_sqls('c5') + #self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c6') + self.prepare_and_query_and_compare(sqls, order_list) + #sqls, order_list = self.get_interval_template_sqls('c7') + #self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c8') + self.prepare_and_query_and_compare(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c9') + self.prepare_and_query_and_compare(sqls, order_list) + + def test_sort_for_partition_no_agg_limit(self): + sqls_template = [ + 'select * from meters partition by c1 slimit %d limit %d', + 'select * from meters partition by c2 slimit %d limit %d', + 'select * from meters partition by c8 slimit %d limit %d', + ] + sqls = [] + for sql in sqls_template: + sqls.append(sql % (1,1)) + sqls.append(sql % (1,10)) + sqls.append(sql % (10,10)) + sqls.append(sql % (100, 100)) + order_by_list = 'ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,t1,t2,t3,t4,t5,t6' + + self.prepare_and_query_and_compare(sqls, order_by_list, compare_what=COMPARE_LEN) + + + def run(self): + self.prepareTestEnv() + #time.sleep(99999999) + self.test_sort_for_partition_hint() + self.test_sort_for_partition_res() + self.test_sort_for_partition_interval() + self.test_sort_for_partition_no_agg_limit() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index adea684ef0..443c27fd7e 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -18,6 +18,7 @@ python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4 python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4 +python3 ./test.py -f 2-query/partition_by_col.py -Q 4 python3 ./test.py -f 7-tmq/tmqShow.py python3 ./test.py -f 7-tmq/tmqDropStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py