diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 244894b59b..bc6851475a 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -28,6 +28,7 @@ typedef struct SBlockOrderInfo { bool nullFirst; int32_t order; int32_t slotId; + void* compFn; SColumnInfoData* pColData; } SBlockOrderInfo; @@ -210,6 +211,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock); size_t blockDataGetSerialMetaSize(uint32_t numOfCols); int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); +/** + * @brief find how many rows already in order start from first row + */ +int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 79308a8c93..14d70b5812 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -136,6 +136,8 @@ typedef struct SAggLogicNode { bool hasTimeLineFunc; bool onlyHasKeepOrderFunc; bool hasGroupKeyOptimized; + bool isGroupTb; + bool isPartTb; // true if partition keys has tbname } SAggLogicNode; typedef struct SProjectLogicNode { @@ -263,6 +265,7 @@ typedef struct SWindowLogicNode { int8_t igExpired; int8_t igCheckUpdate; EWindowAlgorithm windowAlgo; + bool isPartTb; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -281,6 +284,7 @@ typedef struct SSortLogicNode { bool groupSort; bool skipPKSortOpt; bool calcGroupId; + bool excludePkCol; // exclude PK ts col when calc group id } SSortLogicNode; typedef struct SPartitionLogicNode { @@ -288,6 +292,9 @@ typedef struct SPartitionLogicNode { SNodeList* pPartitionKeys; SNodeList* pTags; SNode* pSubtable; + + bool needBlockOutputTsOrder; // if true, partition output block will have ts order maintained + int32_t tsSlotId; } SPartitionLogicNode; typedef enum ESubplanType { @@ -604,6 +611,7 @@ typedef struct SSortPhysiNode { SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode SNodeList* pTargets; bool calcGroupId; + bool excludePkCol; } SSortPhysiNode; typedef SSortPhysiNode SGroupSortPhysiNode; @@ -613,6 +621,9 @@ typedef struct SPartitionPhysiNode { SNodeList* pExprs; // these are expression list of partition_by_clause SNodeList* pPartitionKeys; SNodeList* pTargets; + + bool needBlockOutputTsOrder; + int32_t tsSlotId; } SPartitionPhysiNode; typedef struct SStreamPartitionPhysiNode { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f7bb6f85e2..b3fe1b7698 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -933,7 +933,13 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { return 0; } } - __compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order); + + __compar_fn_t fn; + if (pOrder->compFn) { + fn = pOrder->compFn; + } else { + fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order); + } int ret = fn(left1, right1); if (ret == 0) { @@ -1099,6 +1105,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); + pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order); } terrno = 0; @@ -2515,3 +2522,20 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); } + +int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) { + if (!pDataBlock || !pOrderInfo) return 0; + for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { + SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i); + pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId); + pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order); + } + SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock}; + int32_t rowIdx = 0, nextRowIdx = 1; + for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) { + if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) { + break; + } + } + return nextRowIdx; +} diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 863a85ef7a..740ff7b0dc 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -198,22 +198,20 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S SStorageAPI* pStorageAPI); /** - * @brief extract col data according to sort/group cols - * @param pSortGroupCols sort keys or group keys, array of SColumnNode - * @param [out] pColVals col vals extracted, array of SGroupKeys + * @brief build a tuple into keyBuf + * @param [out] keyBuf the output buf + * @param [in] pSortGroupCols the cols to build + * @param [in] pBlock block the tuple in */ -void extractCols(SArray* pSortGroupCols, SArray* pColVals, SSDataBlock* pBlock, int32_t rowIndex); -/** - * @breif build keys buffer with col values - * @retval key length - * @param [out] buf buffer to store result key - */ -int32_t buildKeys(char* buf, SArray* pColVals); +int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex); + +int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pDataBlock, + int32_t rowIndex); uint64_t calcGroupId(char *pData, int32_t len); SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys); -int32_t extractSortGroupKeysInfo(SArray** pColVals, int32_t* keyLen, char** keyBuf, const SArray* pColList); +int32_t extractKeysLen(const SArray* keys); #endif // TDENGINE_EXECUTIL_H diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index fe3ceb4b6f..365acf2bff 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -194,7 +194,15 @@ void tsortSetClosed(SSortHandle* pHandle); void tsortSetSingleTableMerge(SSortHandle* pHandle); void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param); -int32_t tsortBuildKeys(SArray* pSortGroupCols, SArray* pColVals, STupleHandle* pTuple, char* keyBuf); +/** + * @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen + * @param [in] pSortCols cols to comp and build + * @param [in, out] pass in the old keys, if comp not equal, new keys will be built in it. + * @param [in, out] keyLen the old keysLen, if comp not equal, new keysLen will be stored in it. + * @param [in] the tuple to comp with + * @retval 0 if comp equal, 1 if not + */ +int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 97fea0e711..e57164fdd8 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2262,75 +2262,71 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t ts[4] = pWin->ekey + delta; // window end key } -void extractCols(SArray* pSortGroupCols, SArray* pColVals, SSDataBlock* pBlock, int32_t rowIndex) { +int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) { SColumnDataAgg* pColAgg = NULL; + const char* isNull = oldkeyBuf; + const char* p = oldkeyBuf + sizeof(int8_t) * taosArrayGetSize(pSortGroupCols); - size_t numOfGroupCols = taosArrayGetSize(pSortGroupCols); + for (int32_t i = 0; i < taosArrayGetSize(pSortGroupCols); ++i) { + const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i); + const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = (SColumn*) taosArrayGet(pSortGroupCols, i); - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); - - // valid range check. todo: return error code. - if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) { - continue; - } - - if (pBlock->pBlockAgg != NULL) { - pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched? - } - - SGroupKeys* pkey = taosArrayGet(pColVals, i); if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { - pkey->isNull = true; + if (isNull[i] != 1) return 1; } else { - pkey->isNull = false; - char* val = colDataGetData(pColInfoData, rowIndex); - if (pkey->type == TSDB_DATA_TYPE_JSON) { - if (tTagIsJson(val)) { - terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; - return; - } - int32_t dataLen = getJsonValueLen(val); - memcpy(pkey->pData, val, dataLen); - } else if (IS_VAR_DATA_TYPE(pkey->type)) { - memcpy(pkey->pData, val, varDataTLen(val)); - ASSERT(varDataTLen(val) <= pkey->bytes); + if (isNull[i] != 0) return 1; + const char* val = colDataGetData(pColInfoData, rowIndex); + if (pCol->type == TSDB_DATA_TYPE_JSON) { + int32_t len = getJsonValueLen(val); + if (memcmp(p, val, len) != 0) return 1; + p += len; + } else if (IS_VAR_DATA_TYPE(pCol->type)) { + if (memcmp(p, val, varDataTLen(val)) != 0) return 1; + p += varDataTLen(val); } else { - memcpy(pkey->pData, val, pkey->bytes); + if (0 != memcmp(p, val, pCol->bytes)) return 1; + p += pCol->bytes; } } } + if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1; + return 0; } -int32_t buildKeys(char* buf, SArray* pColVals) { - size_t numOfGroupCols = taosArrayGetSize(pColVals); +int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, + int32_t rowIndex) { + uint32_t colNum = taosArrayGetSize(pSortGroupCols); + SColumnDataAgg* pColAgg = NULL; + char* isNull = keyBuf; + char* p = keyBuf + sizeof(int8_t) * colNum; - char* isNull = (char*)buf; - char* pStart = (char*)buf + sizeof(int8_t) * numOfGroupCols; - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SGroupKeys* pkey = taosArrayGet(pColVals, i); - if (pkey->isNull) { + for (int32_t i = 0; i < colNum; ++i) { + const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i); + const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId); + if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) continue; + + if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; + + if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) { isNull[i] = 1; - continue; - } - - isNull[i] = 0; - if (pkey->type == TSDB_DATA_TYPE_JSON) { - int32_t dataLen = getJsonValueLen(pkey->pData); - memcpy(pStart, (pkey->pData), dataLen); - pStart += dataLen; - } else if (IS_VAR_DATA_TYPE(pkey->type)) { - varDataCopy(pStart, pkey->pData); - pStart += varDataTLen(pkey->pData); - ASSERT(varDataTLen(pkey->pData) <= pkey->bytes); } else { - memcpy(pStart, pkey->pData, pkey->bytes); - pStart += pkey->bytes; + isNull[i] = 0; + const char* val = colDataGetData(pColInfoData, rowIndex); + if (pCol->type == TSDB_DATA_TYPE_JSON) { + int32_t len = getJsonValueLen(val); + memcpy(p, val, len); + p += len; + } else if (IS_VAR_DATA_TYPE(pCol->type)) { + varDataCopy(p, val); + p += varDataTLen(val); + } else { + memcpy(p, val, pCol->bytes); + p += pCol->bytes; + } } } - - return (int32_t)(pStart - (char*)buf); + return (int32_t)(p - keyBuf); } uint64_t calcGroupId(char* pData, int32_t len) { @@ -2356,37 +2352,13 @@ SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) { return ret; } -int32_t extractSortGroupKeysInfo(SArray** pColVals, int32_t* keyLen, char** keyBuf, const SArray* pColList) { - *pColVals = taosArrayInit(4, sizeof(SGroupKeys)); - if ((*pColVals) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; +int32_t extractKeysLen(const SArray* keys) { + int32_t len = 0; + int32_t keyNum = taosArrayGetSize(keys); + for (int32_t i = 0; i < keyNum; ++i) { + SColumn* pCol = (SColumn*)taosArrayGet(keys, i); + len += pCol->bytes; } - - *keyLen = 0; - int32_t numOfGroupCols = taosArrayGetSize(pColList); - for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = (SColumn*)taosArrayGet(pColList, i); - (*keyLen) += pCol->bytes; // actual data + null_flag - - SGroupKeys key = {0}; - key.bytes = pCol->bytes; - key.type = pCol->type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, pCol->bytes); - if (key.pData == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush((*pColVals), &key); - } - - int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols; - (*keyLen) += nullFlagSize; - - (*keyBuf) = taosMemoryCalloc(1, (*keyLen)); - if ((*keyBuf) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - return TSDB_CODE_SUCCESS; + len += sizeof(int8_t) * keyNum; //null flag + return len; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3778a2625c..1060dd4f0e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -56,6 +56,10 @@ typedef struct SPartitionOperatorInfo { int32_t groupIndex; // group index int32_t pageIndex; // page index of current group SExprSupp scalarSup; + + int32_t remainRows; + int32_t orderedRows; + SArray* pOrderInfoArr; } SPartitionOperatorInfo; static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); @@ -685,37 +689,49 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { SPartitionOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SDataGroupInfo* pGroupInfo = - (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; - if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { - // try next group data - ++pInfo->groupIndex; - if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { - setOperatorCompleted(pOperator); - clearPartitionOperator(pInfo); - return NULL; + if (pInfo->remainRows == 0) { + blockDataCleanup(pInfo->binfo.pRes); + SDataGroupInfo* pGroupInfo = + (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; + if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { + // try next group data + ++pInfo->groupIndex; + if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { + setOperatorCompleted(pOperator); + clearPartitionOperator(pInfo); + return NULL; + } + + pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex); + pInfo->pageIndex = 0; } - pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex); - pInfo->pageIndex = 0; + int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); + void* page = getBufPage(pInfo->pBuf, *pageId); + if (page == NULL) { + qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, terrno); + } + + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity); + blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); + + pInfo->pageIndex += 1; + releaseBufPage(pInfo->pBuf, page); + pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; + pInfo->binfo.pRes->info.dataLoad = 1; + pInfo->orderedRows = 0; } - int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); - void* page = getBufPage(pInfo->pBuf, *pageId); - if (page == NULL) { - qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); - T_LONG_JMP(pTaskInfo->env, terrno); + if (pInfo->pOrderInfoArr) { + pInfo->binfo.pRes->info.rows += pInfo->remainRows; + blockDataTrimFirstRows(pInfo->binfo.pRes, pInfo->orderedRows); + pInfo->orderedRows = blockDataGetSortedRows(pInfo->binfo.pRes, pInfo->pOrderInfoArr); + pInfo->remainRows = pInfo->binfo.pRes->info.rows - pInfo->orderedRows; + pInfo->binfo.pRes->info.rows = pInfo->orderedRows; } - blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity); - blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); - - pInfo->pageIndex += 1; - releaseBufPage(pInfo->pBuf, page); - - pInfo->binfo.pRes->info.dataLoad = 1; blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); - pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId; pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows; return pInfo->binfo.pRes; @@ -732,7 +748,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - blockDataCleanup(pRes); return buildPartitionResult(pOperator); } @@ -815,6 +830,7 @@ static void destroyPartitionOperatorInfo(void* param) { cleanupExprSupp(&pInfo->scalarSup); destroyDiskbasedBuf(pInfo->pBuf); + taosArrayDestroy(pInfo->pOrderInfoArr); taosMemoryFreeClear(param); } @@ -832,6 +848,17 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); + if (pPartNode->needBlockOutputTsOrder) { + SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId}; + pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo)); + if (!pInfo->pOrderInfoArr) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = terrno; + goto _error; + } + taosArrayPush(pInfo->pOrderInfoArr, &order); + } + if (pPartNode->pExprs != NULL) { int32_t num = 0; SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index b71f95b22f..18a666d2c2 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -22,11 +22,10 @@ typedef struct SSortOpGroupIdCalc { STupleHandle* pSavedTuple; SArray* pSortColsArr; - SArray* pSortColVals; char* keyBuf; - char* lastKeyBuf; - int32_t lastKeysLen; + int32_t lastKeysLen; // default to be 0 uint64_t lastGroupId; + bool excludePKCol; } SSortOpGroupIdCalc; typedef struct SSortOperatorInfo { @@ -93,7 +92,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); if (pSortNode->calcGroupId) { - SSortOpGroupIdCalc* pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc)); + int32_t keyLen; + SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc)); if (!pGroupIdCalc) { code = TSDB_CODE_OUT_OF_MEMORY; goto _error; @@ -105,17 +105,15 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* if (!pGroupIdCalc->pSortColsArr) code = TSDB_CODE_OUT_OF_MEMORY; nodesClearList(pSortColsNodeArr); } - int32_t keyLen; - if (TSDB_CODE_SUCCESS == code) - code = extractSortGroupKeysInfo(&pGroupIdCalc->pSortColVals, &keyLen, &pGroupIdCalc->keyBuf, - pGroupIdCalc->pSortColsArr); if (TSDB_CODE_SUCCESS == code) { - pGroupIdCalc->lastKeysLen = 0; - pGroupIdCalc->lastKeyBuf = taosMemoryCalloc(1, keyLen); - if (!pGroupIdCalc->lastKeyBuf) code = TSDB_CODE_OUT_OF_MEMORY; + // PK ts col should always at last, see partColOptCreateSort + if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr); + keyLen = extractKeysLen(pGroupIdCalc->pSortColsArr); } if (TSDB_CODE_SUCCESS == code) { - pInfo->pGroupIdCalc = pGroupIdCalc; + pGroupIdCalc->lastKeysLen = 0; + pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen); + if (!pGroupIdCalc->keyBuf) code = TSDB_CODE_OUT_OF_MEMORY; } } if (code != TSDB_CODE_SUCCESS) goto _error; @@ -172,35 +170,40 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { } /** - * @brief get next tuple with group id attached, all tuples fetched from tsortNextTuple are sorted by group keys - * @param pBlock the output block, the group id will be saved in it - * @retval NULL if next group tuple arrived, the pre fetched tuple will be saved in pInfo.pSavedTuple + * @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys + * @param [in, out] pBlock the output block, the group id will be saved in it + * @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple + * @retval NULL if no more tuples */ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) { - STupleHandle *ret = pInfo->pGroupIdCalc->pSavedTuple; - pInfo->pGroupIdCalc->pSavedTuple = NULL; - if (!ret) { - ret = tsortNextTuple(pHandle); + STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple; + if (!retTuple) { + retTuple = tsortNextTuple(pHandle); } - if (ret) { - int32_t len = tsortBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->pSortColVals, ret, - pInfo->pGroupIdCalc->keyBuf); - bool newGroup = len != pInfo->pGroupIdCalc->lastKeysLen - ? true - : memcmp(pInfo->pGroupIdCalc->lastKeyBuf, pInfo->pGroupIdCalc->keyBuf, len) != 0; - bool emptyBlock = pBlock->info.rows == 0; - if (newGroup && !emptyBlock) { - // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return NULL - pInfo->pGroupIdCalc->pSavedTuple = ret; - ret = NULL; + if (retTuple) { + int32_t newGroup; + if (pInfo->pGroupIdCalc->pSavedTuple) { + newGroup = true; + pInfo->pGroupIdCalc->pSavedTuple = NULL; } else { - if (newGroup) { - ASSERT(emptyBlock); - pInfo->pGroupIdCalc->lastKeysLen = len; - pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId = calcGroupId(pInfo->pGroupIdCalc->keyBuf, len); - TSWAP(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeyBuf); - } else if (emptyBlock) { + newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf, + &pInfo->pGroupIdCalc->lastKeysLen, retTuple); + } + bool emptyBlock = pBlock->info.rows == 0; + if (newGroup) { + if (!emptyBlock) { + // new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return + // NULL. Note that the keyBuf and lastKeysLen has been updated to new value + pInfo->pGroupIdCalc->pSavedTuple = retTuple; + retTuple = NULL; + } else { + // new group with empty block + pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId = + calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen); + } + } else { + if (emptyBlock) { // new block but not new group, assign last group id to it pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId; } else { @@ -209,7 +212,7 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf } } - return ret; + return retTuple; } SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, @@ -401,14 +404,8 @@ static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNod static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) { if (pCalc) { - for (int i = 0; i < taosArrayGetSize(pCalc->pSortColVals); i++) { - SGroupKeys key = *(SGroupKeys*)taosArrayGet(pCalc->pSortColVals, i); - taosMemoryFree(key.pData); - } - taosArrayDestroy(pCalc->pSortColVals); taosArrayDestroy(pCalc->pSortColsArr); taosMemoryFree(pCalc->keyBuf); - taosMemoryFree(pCalc->lastKeyBuf); taosMemoryFree(pCalc); } } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index d88ba80466..f4c2735d93 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -1555,7 +1555,14 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) { return info; } -int32_t tsortBuildKeys(SArray* pSortGroupCols, SArray* pColVals, STupleHandle* pTuple, char* keyBuf) { - extractCols(pSortGroupCols, pColVals, pTuple->pBlock, pTuple->rowIndex); - return buildKeys(keyBuf, pColVals); +int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, + const STupleHandle* pTuple) { + int32_t ret; + if (0 == compKeys(pSortCols, keyBuf, *keyLen, pTuple->pBlock, pTuple->rowIndex)) { + ret = 0; + } else { + *keyLen = buildKeys(keyBuf, pSortCols, pTuple->pBlock, pTuple->rowIndex); + ret = 1; + } + return ret; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index ede9fd36f0..817433f5be 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -444,6 +444,7 @@ static int32_t logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) { CLONE_NODE_LIST_FIELD(pGroupKeys); CLONE_NODE_LIST_FIELD(pAggFuncs); COPY_SCALAR_FIELD(hasGroupKeyOptimized); + COPY_SCALAR_FIELD(isPartTb); return TSDB_CODE_SUCCESS; } @@ -532,6 +533,7 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) { CLONE_NODE_LIST_FIELD(pSortKeys); COPY_SCALAR_FIELD(groupSort); COPY_SCALAR_FIELD(calcGroupId); + COPY_SCALAR_FIELD(excludePkCol); return TSDB_CODE_SUCCESS; } @@ -540,6 +542,8 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog CLONE_NODE_LIST_FIELD(pPartitionKeys); CLONE_NODE_LIST_FIELD(pTags); CLONE_NODE_FIELD(pSubtable); + COPY_SCALAR_FIELD(needBlockOutputTsOrder); + COPY_SCALAR_FIELD(tsSlotId); return TSDB_CODE_SUCCESS; } @@ -679,6 +683,8 @@ static int32_t physiPartitionCopy(const SPartitionPhysiNode* pSrc, SPartitionPhy CLONE_NODE_LIST_FIELD(pExprs); CLONE_NODE_LIST_FIELD(pPartitionKeys); CLONE_NODE_LIST_FIELD(pTargets); + COPY_SCALAR_FIELD(needBlockOutputTsOrder); + COPY_SCALAR_FIELD(tsSlotId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 38f7b90ad7..b5dff20440 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2328,6 +2328,7 @@ static const char* jkSortPhysiPlanExprs = "Exprs"; static const char* jkSortPhysiPlanSortKeys = "SortKeys"; static const char* jkSortPhysiPlanTargets = "Targets"; static const char* jkSortPhysiPlanCalcGroupIds = "CalcGroupIds"; +static const char* jkSortPhysiPlanExcludePKCol = "ExcludePKCol"; static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2345,6 +2346,9 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanCalcGroupIds, pNode->calcGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanExcludePKCol, pNode->excludePkCol); + } return code; } @@ -2365,6 +2369,9 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkSortPhysiPlanCalcGroupIds, &pNode->calcGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code= tjsonGetBoolValue(pJson, jkSortPhysiPlanExcludePKCol, &pNode->excludePkCol); + } return code; } @@ -2650,6 +2657,8 @@ static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) { static const char* jkPartitionPhysiPlanExprs = "Exprs"; static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys"; static const char* jkPartitionPhysiPlanTargets = "Targets"; +static const char* jkPartitionPhysiPlanNeedBlockOutputTsOrder = "NeedBlockOutputTsOrder"; +static const char* jkPartitionPhysiPlanTsSlotId = "tsSlotId"; static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) { const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj; @@ -2664,6 +2673,12 @@ static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkPartitionPhysiPlanTargets, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + tjsonAddBoolToObject(pJson, jkPartitionPhysiPlanNeedBlockOutputTsOrder, pNode->needBlockOutputTsOrder); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonAddIntegerToObject(pJson, jkPartitionPhysiPlanTsSlotId, pNode->tsSlotId); + } return code; } @@ -2681,6 +2696,12 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkPartitionPhysiPlanTargets, &pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkPartitionPhysiPlanNeedBlockOutputTsOrder, &pNode->needBlockOutputTsOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkPartitionPhysiPlanTsSlotId, &pNode->tsSlotId); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 13a860c95b..992097e8c5 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2746,7 +2746,14 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_CALC_GROUPID }; +enum { + PHY_SORT_CODE_BASE_NODE = 1, + PHY_SORT_CODE_EXPR, + PHY_SORT_CODE_SORT_KEYS, + PHY_SORT_CODE_TARGETS, + PHY_SORT_CODE_CALC_GROUPID, + PHY_SORT_CODE_EXCLUDE_PK_COL +}; static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2764,6 +2771,9 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_CALC_GROUPID, pNode->calcGroupId); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_EXCLUDE_PK_COL, pNode->excludePkCol); + } return code; } @@ -2790,6 +2800,8 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_CODE_CALC_GROUPID: code = tlvDecodeBool(pTlv, &pNode->calcGroupId); break; + case PHY_SORT_CODE_EXCLUDE_PK_COL: + code = tlvDecodeBool(pTlv, &pNode->excludePkCol); default: break; } @@ -3148,7 +3160,14 @@ static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, PHY_PARTITION_CODE_KEYS, PHY_PARTITION_CODE_TARGETS }; +enum { + PHY_PARTITION_CODE_BASE_NODE = 1, + PHY_PARTITION_CODE_EXPR, + PHY_PARTITION_CODE_KEYS, + PHY_PARTITION_CODE_TARGETS, + PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER, + PHY_PARTITION_CODE_TS_SLOTID +}; static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj; @@ -3163,6 +3182,12 @@ static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_PARTITION_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER, pNode->needBlockOutputTsOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, PHY_PARTITION_CODE_TS_SLOTID, pNode->tsSlotId); + } return code; } @@ -3186,6 +3211,12 @@ static int32_t msgToPhysiPartitionNode(STlvDecoder* pDecoder, void* pObj) { case PHY_PARTITION_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; + case PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER: + code = tlvDecodeBool(pTlv, &pNode->needBlockOutputTsOrder); + break; + case PHY_PARTITION_CODE_TS_SLOTID: + code = tlvDecodeI32(pTlv, &pNode->tsSlotId); + break; default: break; } diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index b06c666c2d..83a4e9ced8 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -51,6 +51,8 @@ int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes); bool isPartTableAgg(SAggLogicNode* pAgg); bool isPartTagAgg(SAggLogicNode* pAgg); bool isPartTableWinodw(SWindowLogicNode* pWindow); +bool keysHasCol(SNodeList* pKeys); +bool keysHasTbname(SNodeList* pKeys); #define CLONE_LIMIT 1 #define CLONE_SLIMIT 1 << 1 diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index cec53e0e25..daf5e363c6 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -738,6 +738,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, } nodesDestroyList(pOutputGroupKeys); + pAgg->isGroupTb = pAgg->pGroupKeys ? keysHasTbname(pAgg->pGroupKeys) : 0; + pAgg->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0; + if (TSDB_CODE_SUCCESS == code) { *pLogicNode = (SLogicNode*)pAgg; } else { @@ -959,6 +962,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva nodesDestroyNode((SNode*)pWindow); return TSDB_CODE_OUT_OF_MEMORY; } + pWindow->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0; return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode); } @@ -1256,6 +1260,14 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS } } + if (keysHasCol(pPartition->pPartitionKeys) && pSelect->pWindow && + nodeType(pSelect->pWindow) == QUERY_NODE_INTERVAL_WINDOW) { + pPartition->needBlockOutputTsOrder = true; + SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSelect->pWindow; + SColumnNode* pTsCol = (SColumnNode*)pInterval->pCol; + pPartition->tsSlotId = pTsCol->slotId; + } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags) { pPartition->pTags = nodesCloneList(pSelect->pTags); if (NULL == pPartition->pTags) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 3e85329603..36c54e342d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1661,22 +1661,6 @@ static int32_t smaIndexOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub return smaIndexOptimizeImpl(pCxt, pLogicSubplan, pScan); } -static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) { - if (QUERY_NODE_COLUMN == nodeType(pNode)) { - if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) { - *(bool*)pContext = true; - return DEAL_RES_END; - } - } - return DEAL_RES_CONTINUE; -} - -static bool planOptNodeListHasCol(SNodeList* pKeys) { - bool hasCol = false; - nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol); - return hasCol; -} - static EDealRes partTagsOptHasTbname(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { if (COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) { @@ -1755,7 +1739,7 @@ static bool partTagsOptMayBeOptimized(SLogicNode* pNode) { return false; } - return !planOptNodeListHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode)); + return !keysHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode)); } static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) { @@ -2042,7 +2026,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* } int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild); - TSWAP(pProjectNode->node.pHint, pChild->pHint); + if (pProjectNode->node.pHint && !pChild->pHint) TSWAP(pProjectNode->node.pHint, pChild->pHint); if (TSDB_CODE_SUCCESS == code) { NODES_CLEAR_LIST(pProjectNode->node.pChildren); nodesDestroyNode((SNode*)pProjectNode); @@ -2735,7 +2719,7 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) { } SAggLogicNode* pAgg = (SAggLogicNode*)(pNode->pParent); - if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || planOptNodeListHasCol(pAgg->pGroupKeys) || + if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || keysHasCol(pAgg->pGroupKeys) || !planOptNodeListHasTbname(pAgg->pGroupKeys)) { return false; } @@ -3591,8 +3575,7 @@ static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicS static bool partColOptShouldBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) { SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode; - if (pPartition->node.pParent && nodeType(pPartition->node.pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) return false; - if (planOptNodeListHasCol(pPartition->pPartitionKeys)) return true; + if (keysHasCol(pPartition->pPartitionKeys)) return true; } return false; } @@ -3604,6 +3587,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { if (pSort) { pSort->groupSort = false; TSWAP(pSort->node.pChildren, pPartition->node.pChildren); + optResetParent((SLogicNode*)pSort); FOREACH(node, pPartition->pPartitionKeys) { SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); if (!pOrder) { @@ -3615,6 +3599,30 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) { if (!pOrder->pExpr) code = TSDB_CODE_OUT_OF_MEMORY; } } + + if (pPartition->needBlockOutputTsOrder) { + SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); + if (!pOrder) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + pSort->excludePkCol = true; + nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder); + pOrder->order = ORDER_ASC; + pOrder->pExpr = 0; + FOREACH(node, pPartition->node.pTargets) { + if (nodeType(node) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)node; + if (pCol->slotId == pPartition->tsSlotId) { + pOrder->pExpr = nodesCloneNode((SNode*)pCol); + break; + } + } + } + if (!pOrder->pExpr) { + code = TSDB_CODE_PAR_INTERNAL_ERROR; + } + } + } } if (code == TSDB_CODE_SUCCESS) { pSort->node.pTargets = nodesCloneList(((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0))->pTargets); @@ -3639,13 +3647,17 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub // replace with sort node SSortLogicNode* pSort = partColOptCreateSort(pNode); - if (!pSort) code = TSDB_CODE_OUT_OF_MEMORY; - if (code == TSDB_CODE_SUCCESS) { + if (!pSort) { + // if sort create failed, we eat the error, skip the optimization + code = TSDB_CODE_SUCCESS; + } else { pSort->calcGroupId = true; code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort); - } - if (code == TSDB_CODE_SUCCESS) { - pCxt->optimized = true; + if (code == TSDB_CODE_SUCCESS) { + pCxt->optimized = true; + } else { + nodesDestroyNode((SNode*)pSort); + } } return code; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 651c2b4db1..bb3fa9a10c 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1750,6 +1750,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren SNodeList* pSortKeys = NULL; int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys); pSort->calcGroupId = pSortLogicNode->calcGroupId; + pSort->excludePkCol = pSortLogicNode->excludePkCol; SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); // push down expression to pOutputDataBlockDesc of child node @@ -1798,6 +1799,8 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList* SNodeList* pPrecalcExprs = NULL; SNodeList* pPartitionKeys = NULL; int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys); + pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder; + pPart->tsSlotId = pPartLogicNode->tsSlotId; SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); // push down expression to pOutputDataBlockDesc of child node diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 5296ccf710..1c7c937b7f 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -358,12 +358,12 @@ static bool stbNotSystemScan(SLogicNode* pNode) { } } -static bool stbHasPartTbname(SNodeList* pPartKeys) { - if (NULL == pPartKeys) { +bool keysHasTbname(SNodeList* pKeys) { + if (NULL == pKeys) { return false; } SNode* pPartKey = NULL; - FOREACH(pPartKey, pPartKeys) { + FOREACH(pPartKey, pKeys) { if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) { pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0); } @@ -390,10 +390,10 @@ bool isPartTableAgg(SAggLogicNode* pAgg) { return false; } if (NULL != pAgg->pGroupKeys) { - return stbHasPartTbname(pAgg->pGroupKeys) && + return (pAgg->isGroupTb || keysHasTbname(pAgg->pGroupKeys)) && stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)); } - return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); + return pAgg->isPartTb || keysHasTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); } static bool stbHasPartTag(SNodeList* pPartKeys) { @@ -478,7 +478,7 @@ bool isPartTagAgg(SAggLogicNode* pAgg) { } bool isPartTableWinodw(SWindowLogicNode* pWindow) { - return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); + return pWindow->isPartTb || keysHasTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0))); } bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) { @@ -501,3 +501,19 @@ bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) { } return cloned; } + +static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + } + return DEAL_RES_CONTINUE; +} + +bool keysHasCol(SNodeList* pKeys) { + bool hasCol = false; + nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol); + return hasCol; +} diff --git a/tests/system-test/2-query/partition_by_col.py b/tests/system-test/2-query/partition_by_col.py index 66f014a4f5..a7930337fa 100644 --- a/tests/system-test/2-query/partition_by_col.py +++ b/tests/system-test/2-query/partition_by_col.py @@ -86,7 +86,15 @@ class TDTestCase: 'stbName': 'meters', 'colPrefix': 'c', 'tagPrefix': 't', - 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}], + 'colSchema': [{'type': 'INT', 'count':1}, + {'type': 'BIGINT', 'count':1}, + {'type': 'FLOAT', 'count':1}, + {'type': 'DOUBLE', 'count':1}, + {'type': 'smallint', 'count':1}, + {'type': 'tinyint', 'count':1}, + {'type': 'bool', 'count':1}, + {'type': 'binary', 'len':10, 'count':1}, + {'type': 'nchar', 'len':10, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}], 'ctbPrefix': 't', 'ctbStartIdx': 0, @@ -128,26 +136,35 @@ class TDTestCase: def test_sort_for_partition_hint(self): - sql = 'explain select count(*), c1 from meters partition by c1' - sql_hint = 'explain select /*+ sort_for_group() */count(*), c1 from meters partition by c1' - tdSql.query(sql) - self.check_explain_res_has_row("Partition on", tdSql.queryResult) - tdSql.query(sql_hint) - self.check_explain_res_has_row("Sort", tdSql.queryResult) + sql = 'select count(*), c1 from meters partition by c1' + sql_hint = 'select /*+ sort_for_group() */count(*), c1 from meters partition by c1' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) - sql = 'explain select count(*), c1, tbname from meters partition by tbname, c1' - sql_hint = 'explain select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1' - tdSql.query(sql) - self.check_explain_res_has_row("Partition on", tdSql.queryResult) - tdSql.query(sql_hint) - self.check_explain_res_has_row("Sort", tdSql.queryResult) + sql = 'select count(*), c1, tbname from meters partition by tbname, c1' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) - sql_interval = 'explain select count(*), c1 from meters partition by c1 interval(1s)' - sql_interval_hint = 'explain select /*+ sort_for_group() */ count(*), c1 from meters partition by c1 interval(1s)' - tdSql.query(sql_interval) - self.check_explain_res_has_row("Partition on", tdSql.queryResult) - tdSql.query(sql_interval_hint) - self.check_explain_res_has_row("Partition on", tdSql.queryResult) + sql = 'select count(*), c1, tbname from meters partition by tbname, c1 interval(1s)' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1 interval(1s)' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + + sql = 'select count(*), c1, t1 from meters partition by t1, c1' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + + sql = 'select count(*), c1, t1 from meters partition by t1, c1 interval(1s)' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1 interval(1s)' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + + sql = 'select count(*), c1 from meters partition by c1 interval(1s)' + sql_hint = 'select /*+ sort_for_group() */ count(*), c1 from meters partition by c1 interval(1s)' + self.check_explain_res_has_row("Partition on", self.explain_sql(sql)) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) def add_order_by(self, sql: str, order_by: str, select_list: str = "*") -> str: return "select %s from (%s)t order by %s" % (select_list, sql, order_by) @@ -157,9 +174,14 @@ class TDTestCase: def query_with_time(self, sql): start = datetime.now() - tdSql.query(sql) + tdSql.query(sql, queryTimes=1) return (datetime.now().timestamp() - start.timestamp()) * 1000 + def explain_sql(self, sql: str): + sql = "explain " + sql + tdSql.query(sql) + return tdSql.queryResult + def query_and_compare_res(self, sql1, sql2): dur = self.query_with_time(sql1) tdLog.debug("sql1 query with time: [%f]" % dur) @@ -180,8 +202,9 @@ class TDTestCase: for sql in sqls: sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list) sql = self.add_order_by(sql, order_by, select_list) + self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint)) + self.check_explain_res_has_row("Partition", self.explain_sql(sql)) self.query_and_compare_res(sql, sql_hint) - pass def test_sort_for_partition_res(self): sqls_par_c1_agg = [ @@ -200,7 +223,7 @@ class TDTestCase: ] sqls_par_tbname_c1 = [ - "select count(*), c1 , tbname as tb from meters partition by tbname, c1" + "select count(*), c1 , tbname as a from meters partition by tbname, c1" ] sqls_par_tag_c1 = [ "select count(*), c1, t1 from meters partition by t1, c1" @@ -209,13 +232,60 @@ class TDTestCase: self.prepare_and_query(sqls_par_c1, "c1, ts, c2", "c1, ts, c2") self.prepare_and_query(sqls_par_c1_c2_agg, "c1, c2") self.prepare_and_query(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3") - self.prepare_and_query(sqls_par_tbname_c1, "tb, c1") + self.prepare_and_query(sqls_par_tbname_c1, "a, c1") self.prepare_and_query(sqls_par_tag_c1, "t1, c1") + def get_interval_template_sqls(self, col_name): + sqls = [ + 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1s)' % (col_name, col_name), + 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30s)' % (col_name, col_name), + 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1m)' % (col_name, col_name), + 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30m)' % (col_name, col_name), + 'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1h)' % (col_name, col_name), + + 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1s)' % (col_name, col_name), + 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30s)' % (col_name, col_name), + 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1m)' % (col_name, col_name), + 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name), + 'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name), + + 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1s)' % (col_name, col_name, col_name), + 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name), + 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name), + 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name), + 'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name), + + 'select _wstart as ts, count(*), tbname as a, %s from meters partition by %s, tbname interval(1s)' % (col_name, col_name), + 'select _wstart as ts, count(*), t1 as a, %s from meters partition by %s, t1 interval(1s)' % (col_name, col_name), + ] + order_list = 'a, %s, ts' % (col_name) + return (sqls, order_list) + + def test_sort_for_partition_interval(self): + sqls, order_list = self.get_interval_template_sqls('c1') + self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c2') + self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c3') + self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c4') + self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c5') + self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c6') + self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c7') + self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c8') + self.prepare_and_query(sqls, order_list) + sqls, order_list = self.get_interval_template_sqls('c9') + self.prepare_and_query(sqls, order_list) + def run(self): self.prepareTestEnv() self.test_sort_for_partition_hint() self.test_sort_for_partition_res() + self.test_sort_for_partition_interval() def stop(self): tdSql.close()