fix: fix limit/offset bugs
This commit is contained in:
parent
30129f64da
commit
5d9f6fd481
|
@ -2905,28 +2905,32 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
blockDataCleanup(pResBlock);
|
blockDataCleanup(pResBlock);
|
||||||
|
STupleHandle* pTupleHandle = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
while (1) {
|
||||||
if (pTupleHandle == NULL) {
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
break;
|
if (pTupleHandle == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
appendOneRowToDataBlock(pResBlock, pTupleHandle);
|
||||||
|
if (pResBlock->info.rows >= capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
appendOneRowToDataBlock(pResBlock, pTupleHandle);
|
if (tsortIsClosed(pHandle)) {
|
||||||
if (pResBlock->info.rows >= capacity) {
|
terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
||||||
|
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
||||||
|
pInfo->limitInfo.numOfOutputRows);
|
||||||
|
if (pTupleHandle == NULL || limitReached || pResBlock->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsortIsClosed(pHandle)) {
|
|
||||||
terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
|
|
||||||
T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
|
||||||
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
|
|
||||||
pInfo->limitInfo.numOfOutputRows);
|
|
||||||
|
|
||||||
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -714,7 +714,9 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
taosArrayDestroy(pResList);
|
taosArrayDestroy(pResList);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int nRows = 0;
|
|
||||||
|
int nMergedRows = 0;
|
||||||
|
|
||||||
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
||||||
while (1) {
|
while (1) {
|
||||||
if (tsortIsClosed(pHandle)) {
|
if (tsortIsClosed(pHandle)) {
|
||||||
|
@ -722,15 +724,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
|
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
nRows += pDataBlock->info.rows;
|
|
||||||
|
|
||||||
int32_t pageId = -1;
|
int32_t pageId = -1;
|
||||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||||
|
@ -750,7 +747,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
|
|
||||||
setBufPageDirty(pPage, true);
|
setBufPageDirty(pPage, true);
|
||||||
releaseBufPage(pHandle->pBuf, pPage);
|
releaseBufPage(pHandle->pBuf, pPage);
|
||||||
|
nMergedRows += pDataBlock->info.rows;
|
||||||
|
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sortComparCleanup(&pHandle->cmpParam);
|
sortComparCleanup(&pHandle->cmpParam);
|
||||||
|
@ -915,15 +917,12 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
|
|
||||||
SMultiwayMergeTreeInfo* pTree = NULL;
|
SMultiwayMergeTreeInfo* pTree = NULL;
|
||||||
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, blockCompareTsFn);
|
||||||
int32_t numEnded = 0;
|
|
||||||
int32_t nRows = 0;
|
int32_t nRows = 0;
|
||||||
|
int32_t nMergedRows = 0;
|
||||||
|
bool mergeLimitReached = false;
|
||||||
size_t blkPgSz = pgHeaderSz;
|
size_t blkPgSz = pgHeaderSz;
|
||||||
|
|
||||||
while (nRows < totalRows) {
|
while (nRows < totalRows) {
|
||||||
if (pHandle->mergeLimit != -1 && nRows >= pHandle->mergeLimit) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
||||||
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
||||||
int32_t minRow = sup.aRowIdx[minIdx];
|
int32_t minRow = sup.aRowIdx[minIdx];
|
||||||
|
@ -931,9 +930,15 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
|
|
||||||
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
||||||
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
|
|
||||||
blockDataCleanup(pHandle->pDataBlock);
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
blkPgSz = pgHeaderSz;
|
blkPgSz = pgHeaderSz;
|
||||||
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
|
bufInc = getPageBufIncForRow(minBlk, minRow, 0);
|
||||||
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
|
mergeLimitReached = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
|
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
|
||||||
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
|
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
|
||||||
|
@ -943,14 +948,16 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
|
||||||
|
|
||||||
if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
|
if (sup.aRowIdx[minIdx] == minBlk->info.rows - 1) {
|
||||||
sup.aRowIdx[minIdx] = -1;
|
sup.aRowIdx[minIdx] = -1;
|
||||||
++numEnded;
|
|
||||||
} else {
|
} else {
|
||||||
++sup.aRowIdx[minIdx];
|
++sup.aRowIdx[minIdx];
|
||||||
}
|
}
|
||||||
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
||||||
}
|
}
|
||||||
if (pHandle->pDataBlock->info.rows > 0) {
|
if (pHandle->pDataBlock->info.rows > 0) {
|
||||||
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
if (!mergeLimitReached) {
|
||||||
|
appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
|
||||||
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
|
}
|
||||||
blockDataCleanup(pHandle->pDataBlock);
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
}
|
}
|
||||||
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
|
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
|
||||||
|
|
Loading…
Reference in New Issue