fix: row id sort when no limit and row size is more than 256

This commit is contained in:
slzhou 2023-12-21 08:05:09 +08:00
parent 9ebca3eaff
commit 084245e6d2
1 changed files with 47 additions and 36 deletions

View File

@ -3568,8 +3568,9 @@ int32_t startRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->sortRowIdInfo;
int32_t pageSize = getProperSortPageSize(blockDataGetRowSize(pInfo->pResBlock),
taosArrayGetSize(pInfo->pResBlock->pDataBlock));
int32_t memSize = pageSize * 1024;
int32_t code = createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize, memSize, "tms-ext-src-block", tsTempDir);
pageSize *= 2;
int32_t memSize = MIN(pageSize * 2048, 256 * 1024 * 1024);
int32_t code = createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize * 2, memSize, "tms-ext-src-block", tsTempDir);
dBufSetPrintInfo(pSort->pExtSrcRowsBuf);
return code;
}
@ -3863,6 +3864,34 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
return TSDB_CODE_SUCCESS;
}
static void initRowIdSortputBlock(STableMergeScanInfo* pInfo) {
SSDataBlock* pSortInput = createDataBlock();
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
blockDataAppendColInfo(pSortInput, &tsCol);
SColumnInfoData pageIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
blockDataAppendColInfo(pSortInput, &pageIdCol);
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &offsetCol);
pInfo->pSortInputBlock = pSortInput;
int32_t srcTsSlotId = 0;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) {
SColMatchItem* colInfo = taosArrayGet(pInfo->base.matchInfo.pList, i);
if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
srcTsSlotId = colInfo->dstSlotId;
}
}
pInfo->sortRowIdInfo.srcTsSlotId = srcTsSlotId;
SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
SBlockOrderInfo bi = {0};
bi.order = pInfo->base.cond.order;
bi.slotId = 0;
bi.nullFirst = NULL_ORDER_FIRST;
taosArrayPush(pList, &bi);
pInfo->pSortInfo = pList;
return;
}
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
@ -3922,39 +3951,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
initResultSizeInfo(&pOperator->resultInfo, 1024);
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
pInfo->bSortRowId = true;
if (!pInfo->bSortRowId) {
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
} else {
SSDataBlock* pSortInput = createDataBlock();
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
blockDataAppendColInfo(pSortInput, &tsCol);
SColumnInfoData pageIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
blockDataAppendColInfo(pSortInput, &pageIdCol);
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &offsetCol);
pInfo->pSortInputBlock = pSortInput;
int32_t srcTsSlotId = 0;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->base.matchInfo.pList); ++i) {
SColMatchItem* colInfo = taosArrayGet(pInfo->base.matchInfo.pList, i);
if (colInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
srcTsSlotId = colInfo->dstSlotId;
}
}
pInfo->sortRowIdInfo.srcTsSlotId = srcTsSlotId;
SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
SBlockOrderInfo bi = {0};
bi.order = pInfo->base.cond.order;
bi.slotId = 0;
bi.nullFirst = NULL_ORDER_FIRST;
taosArrayPush(pList, &bi);
pInfo->pSortInfo = pList;
}
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->mTableNumRows = tSimpleHashInit(1024,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
@ -3964,9 +3962,22 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->mergeLimit = pInfo->limitInfo.limit.limit + pInfo->limitInfo.limit.offset;
pInfo->mSkipTables = NULL;
}
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pSortInputBlock->info.rowSize;
if (!hasLimit && blockDataGetRowSize(pInfo->pResBlock) >= 256) {
pInfo->bSortRowId = true;
} else {
pInfo->bSortRowId = false;
}
if (!pInfo->bSortRowId) {
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
} else {
initRowIdSortputBlock(pInfo);
}
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
int32_t rowSize = blockDataGetRowSize(pInfo->pSortInputBlock);
uint32_t nCols = taosArrayGetSize(pInfo->pSortInputBlock->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
if (!tsExperimental) {