diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 98e96bb250..1c55da412b 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -302,11 +302,13 @@ typedef struct STmsSubTableInput { int32_t rowIdx; int64_t* aTs; + SSDataBlock* pInputBlock; } STmsSubTableInput; typedef struct SBlockOrderInfo SBlockOrderInfo; typedef struct STmsSubTablesMergeInfo { - SBlockOrderInfo* pOrderInfo; + SBlockOrderInfo* pTsOrderInfo; + SBlockOrderInfo* pPkOrderInfo; int32_t numSubTables; STmsSubTableInput* aInputs; diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 852e59aa80..b9118cf00d 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -212,6 +212,9 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke * @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param */ void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param); + +int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, + int32_t leftRowIndex, int32_t rightRowIndex, void* pOrder); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c697a6fc3..5c02dce53c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3440,9 +3440,7 @@ _error: // table merge scan operator -// table merge scan operator - -static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) { +static int32_t subTblRowCompareTsFn(const void* pLeft, const void* pRight, void* param) { int32_t left = *(int32_t*)pLeft; int32_t right = *(int32_t*)pRight; STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param; @@ -3459,12 +3457,39 @@ static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* p int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx]; int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx]; int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0); - if (pInfo->pOrderInfo->order == TSDB_ORDER_DESC) { + if (pInfo->pTsOrderInfo->order == TSDB_ORDER_DESC) { ret = -1 * ret; } return ret; } +static int32_t subTblRowCompareTsPkFn(const void* pLeft, const void* pRight, void* param) { + int32_t left = *(int32_t*)pLeft; + int32_t right = *(int32_t*)pRight; + STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param; + + int32_t leftIdx = pInfo->aInputs[left].rowIdx; + int32_t rightIdx = pInfo->aInputs[right].rowIdx; + + if (leftIdx == -1) { + return 1; + } else if (rightIdx == -1) { + return -1; + } + + int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx]; + int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx]; + int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0); + if (pInfo->pTsOrderInfo->order == TSDB_ORDER_DESC) { + ret = -1 * ret; + } + if (ret == 0 && pInfo->pPkOrderInfo) { + ret = tsortComparBlockCell(pInfo->aInputs[left].pInputBlock, pInfo->aInputs[right].pInputBlock, + leftIdx, rightIdx, pInfo->pPkOrderInfo); + } + return ret; +} + int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond)); dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo)); @@ -3573,11 +3598,12 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { pInput->rowIdx = 0; pInput->pageIdx = -1; } - SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock; - SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); + pInput->pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock; + SColumnInfoData* col = taosArrayGet(pInput->pInputBlock->pDataBlock, pSubTblsInfo->pTsOrderInfo->slotId); pInput->aTs = (int64_t*)col->pData; } - tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn); + __merge_compare_fn_t mergeCompareFn = (!pSubTblsInfo->pPkOrderInfo) ? subTblRowCompareTsFn : subTblRowCompareTsPkFn; + tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, mergeCompareFn); return TSDB_CODE_SUCCESS; } @@ -3587,7 +3613,12 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { if (pSubTblsInfo == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0); + pSubTblsInfo->pTsOrderInfo = taosArrayGet(pInfo->pSortInfo, 0); + if (taosArrayGetSize(pInfo->pSortInfo) == 2) { + pSubTblsInfo->pPkOrderInfo = taosArrayGet(pInfo->pSortInfo, 1); + } else { + pSubTblsInfo->pPkOrderInfo = NULL; + } pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput)); if (pSubTblsInfo->aInputs == NULL) { @@ -3679,7 +3710,8 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo); } if (pInput->rowIdx != -1) { - SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); + SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pTsOrderInfo->slotId); + pInput->pInputBlock = pInputBlock; pInput->aTs = (int64_t*)col->pData; } } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 0c0c83da6b..678d757e68 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -636,7 +636,8 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa // TODO: improve this function performance int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, - int32_t leftRowIndex, int32_t rightRowIndex, SBlockOrderInfo* pOrder) { + int32_t leftRowIndex, int32_t rightRowIndex, void* pCompareOrder) { + SBlockOrderInfo* pOrder = pCompareOrder; SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); @@ -1534,6 +1535,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* taosArrayDestroy(aPgId); taosMemoryFree(sup.aRowIdx); taosMemoryFree(sup.aTs); + taosMemoryFree(sup.aBlks); return code; } nMergedRows += pHandle->pDataBlock->info.rows; @@ -1578,6 +1580,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* taosMemoryFree(pTree); taosMemoryFree(sup.aRowIdx); taosMemoryFree(sup.aTs); + taosMemoryFree(sup.aBlks); return code; } nMergedRows += pHandle->pDataBlock->info.rows; @@ -1597,6 +1600,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* taosMemoryFree(sup.aRowIdx); taosMemoryFree(sup.aTs); + taosMemoryFree(sup.aBlks); tMergeTreeDestroy(&pTree);