feat: para_tables_sort table merge scan support ordering by ts and pk

This commit is contained in:
slzhou 2024-03-22 15:48:36 +08:00
parent e36e7d0427
commit 4231b87158
4 changed files with 52 additions and 11 deletions

View File

@ -302,11 +302,13 @@ typedef struct STmsSubTableInput {
int32_t rowIdx;
int64_t* aTs;
SSDataBlock* pInputBlock;
} STmsSubTableInput;
typedef struct SBlockOrderInfo SBlockOrderInfo;
typedef struct STmsSubTablesMergeInfo {
SBlockOrderInfo* pOrderInfo;
SBlockOrderInfo* pTsOrderInfo;
SBlockOrderInfo* pPkOrderInfo;
int32_t numSubTables;
STmsSubTableInput* aInputs;

View File

@ -212,6 +212,9 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke
* @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param
*/
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param);
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
int32_t leftRowIndex, int32_t rightRowIndex, void* pOrder);
#ifdef __cplusplus
}
#endif

View File

@ -3440,9 +3440,7 @@ _error:
// table merge scan operator
// table merge scan operator
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
static int32_t subTblRowCompareTsFn(const void* pLeft, const void* pRight, void* param) {
int32_t left = *(int32_t*)pLeft;
int32_t right = *(int32_t*)pRight;
STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param;
@ -3459,12 +3457,39 @@ static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* p
int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx];
int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx];
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
if (pInfo->pOrderInfo->order == TSDB_ORDER_DESC) {
if (pInfo->pTsOrderInfo->order == TSDB_ORDER_DESC) {
ret = -1 * ret;
}
return ret;
}
static int32_t subTblRowCompareTsPkFn(const void* pLeft, const void* pRight, void* param) {
int32_t left = *(int32_t*)pLeft;
int32_t right = *(int32_t*)pRight;
STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param;
int32_t leftIdx = pInfo->aInputs[left].rowIdx;
int32_t rightIdx = pInfo->aInputs[right].rowIdx;
if (leftIdx == -1) {
return 1;
} else if (rightIdx == -1) {
return -1;
}
int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx];
int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx];
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
if (pInfo->pTsOrderInfo->order == TSDB_ORDER_DESC) {
ret = -1 * ret;
}
if (ret == 0 && pInfo->pPkOrderInfo) {
ret = tsortComparBlockCell(pInfo->aInputs[left].pInputBlock, pInfo->aInputs[right].pInputBlock,
leftIdx, rightIdx, pInfo->pPkOrderInfo);
}
return ret;
}
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
@ -3573,11 +3598,12 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
pInput->rowIdx = 0;
pInput->pageIdx = -1;
}
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
pInput->pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
SColumnInfoData* col = taosArrayGet(pInput->pInputBlock->pDataBlock, pSubTblsInfo->pTsOrderInfo->slotId);
pInput->aTs = (int64_t*)col->pData;
}
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn);
__merge_compare_fn_t mergeCompareFn = (!pSubTblsInfo->pPkOrderInfo) ? subTblRowCompareTsFn : subTblRowCompareTsPkFn;
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, mergeCompareFn);
return TSDB_CODE_SUCCESS;
}
@ -3587,7 +3613,12 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
if (pSubTblsInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
pSubTblsInfo->pTsOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
if (taosArrayGetSize(pInfo->pSortInfo) == 2) {
pSubTblsInfo->pPkOrderInfo = taosArrayGet(pInfo->pSortInfo, 1);
} else {
pSubTblsInfo->pPkOrderInfo = NULL;
}
pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput));
if (pSubTblsInfo->aInputs == NULL) {
@ -3679,7 +3710,8 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab
adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo);
}
if (pInput->rowIdx != -1) {
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pTsOrderInfo->slotId);
pInput->pInputBlock = pInputBlock;
pInput->aTs = (int64_t*)col->pData;
}
}

View File

@ -636,7 +636,8 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa
// TODO: improve this function performance
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
int32_t leftRowIndex, int32_t rightRowIndex, SBlockOrderInfo* pOrder) {
int32_t leftRowIndex, int32_t rightRowIndex, void* pCompareOrder) {
SBlockOrderInfo* pOrder = pCompareOrder;
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
@ -1534,6 +1535,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
taosArrayDestroy(aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
return code;
}
nMergedRows += pHandle->pDataBlock->info.rows;
@ -1578,6 +1580,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
taosMemoryFree(pTree);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
return code;
}
nMergedRows += pHandle->pDataBlock->info.rows;
@ -1597,6 +1600,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
tMergeTreeDestroy(&pTree);