fix(query): set correct row length for rowid sort.
This commit is contained in:
parent
6e832b68bd
commit
ca022259a0
|
@ -1243,7 +1243,9 @@ static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
|
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx,
|
||||||
|
int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
|
||||||
|
|
||||||
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
|
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
|
||||||
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
|
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
|
||||||
{
|
{
|
||||||
|
@ -1257,11 +1259,13 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p
|
||||||
pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset;
|
pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*pRegionId = pMemFile->currRegionId;
|
*pRegionId = pMemFile->currRegionId;
|
||||||
*pOffset = pMemFile->currRegionOffset;
|
*pOffset = pMemFile->currRegionOffset;
|
||||||
int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
|
int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
|
||||||
int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset);
|
int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset);
|
||||||
*pLength = blockLen;
|
*pLength = blockLen;
|
||||||
|
|
||||||
pMemFile->currRegionOffset += blockLen;
|
pMemFile->currRegionOffset += blockLen;
|
||||||
pMemFile->bRegionDirty = true;
|
pMemFile->bRegionDirty = true;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1324,6 +1328,7 @@ static void initRowIdSort(SSortHandle* pHandle) {
|
||||||
pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
|
pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
|
||||||
blockDataAppendColInfo(pSortInput, &pkCol);
|
blockDataAppendColInfo(pSortInput, &pkCol);
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(pHandle->pDataBlock);
|
blockDataDestroy(pHandle->pDataBlock);
|
||||||
pHandle->pDataBlock = pSortInput;
|
pHandle->pDataBlock = pSortInput;
|
||||||
|
|
||||||
|
@ -1444,32 +1449,61 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) {
|
static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, int32_t dstRowIndex) {
|
||||||
int32_t sz = 0;
|
int32_t size = 0;
|
||||||
int32_t numCols = taosArrayGetSize(blk->pDataBlock);
|
int32_t numCols = taosArrayGetSize(pSrcBlock->pDataBlock);
|
||||||
if (!blk->info.hasVarCol) {
|
|
||||||
sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0);
|
if (!pSrcBlock->info.hasVarCol) {
|
||||||
sz += blockDataGetRowSize(blk);
|
size += numCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
|
||||||
|
size += blockDataGetRowSize(pSrcBlock);
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 0; i < numCols; ++i) {
|
for (int32_t i = 0; i < numCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i);
|
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pSrcBlock->pDataBlock, i);
|
||||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
if ((pColInfoData->varmeta.offset[row] != -1) && (pColInfoData->pData)) {
|
if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) {
|
||||||
char* p = colDataGetData(pColInfoData, row);
|
char* p = colDataGetData(pColInfoData, srcRowIndex);
|
||||||
sz += varDataTLen(p);
|
size += varDataTLen(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
sz += sizeof(pColInfoData->varmeta.offset[0]);
|
size += sizeof(pColInfoData->varmeta.offset[0]);
|
||||||
} else {
|
} else {
|
||||||
sz += pColInfoData->info.bytes;
|
size += pColInfoData->info.bytes;
|
||||||
|
|
||||||
if (((rowIdxInPage) & 0x07) == 0) {
|
if (((dstRowIndex) & 0x07) == 0) {
|
||||||
sz += 1; // bitmap
|
size += 1; // bitmap
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return sz;
|
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex, SColumnInfoData* pPkCol) {
|
||||||
|
int32_t size = 0;
|
||||||
|
int32_t numOfCols = blockDataGetNumOfCols(pDstBlock);
|
||||||
|
|
||||||
|
if (pPkCol == NULL) {
|
||||||
|
ASSERT(!pDstBlock->info.hasVarCol);
|
||||||
|
size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
|
||||||
|
size += blockDataGetRowSize(pDstBlock);
|
||||||
|
} else {
|
||||||
|
if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
|
||||||
|
if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) {
|
||||||
|
char* p = colDataGetData(pPkCol, srcRowIndex);
|
||||||
|
size += varDataTLen(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
size += sizeof(pPkCol->varmeta.offset[0]);
|
||||||
|
} else {
|
||||||
|
size += pPkCol->info.bytes;
|
||||||
|
if (((dstRowIndex) & 0x07) == 0) {
|
||||||
|
size += 1; // bitmap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
|
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
|
||||||
|
@ -1533,8 +1567,19 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
|
||||||
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
|
||||||
int32_t minRow = sup.aRowIdx[minIdx];
|
int32_t minRow = sup.aRowIdx[minIdx];
|
||||||
SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
|
|
||||||
int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows);
|
int32_t bufInc = 0;
|
||||||
|
if (pHandle->bSortByRowId) {
|
||||||
|
if (!pHandle->bSortPk) {
|
||||||
|
bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, NULL);
|
||||||
|
} else { // there may be varchar column exists, so we need to get the pk info, and then calculate the row length
|
||||||
|
SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
|
||||||
|
SColumnInfoData* pPkCol = taosArrayGet(minBlk->pDataBlock, extRowsPkOrder->slotId);
|
||||||
|
bufInc = getPageBufIncForRowIdSort(pHandle->pDataBlock, minRow, pHandle->pDataBlock->info.rows, pPkCol);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
bufInc = getPageBufIncForRow(minBlk, minRow, pHandle->pDataBlock->info.rows);
|
||||||
|
}
|
||||||
|
|
||||||
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
|
||||||
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
|
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
|
||||||
|
@ -1552,7 +1597,8 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
nMergedRows += pHandle->pDataBlock->info.rows;
|
nMergedRows += pHandle->pDataBlock->info.rows;
|
||||||
blockDataCleanup(pHandle->pDataBlock);
|
blockDataCleanup(pHandle->pDataBlock);
|
||||||
blkPgSz = pgHeaderSz;
|
blkPgSz = pgHeaderSz;
|
||||||
incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
|
|
||||||
|
SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
|
||||||
bufInc = getPageBufIncForRow(incBlock, minRow, 0);
|
bufInc = getPageBufIncForRow(incBlock, minRow, 0);
|
||||||
|
|
||||||
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
|
||||||
|
@ -1566,10 +1612,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
|
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
|
||||||
if (!pHandle->bSortByRowId) {
|
if (pHandle->bSortByRowId) {
|
||||||
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
|
|
||||||
} else {
|
|
||||||
appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
|
appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
|
||||||
|
} else {
|
||||||
|
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
blkPgSz += bufInc;
|
blkPgSz += bufInc;
|
||||||
|
|
Loading…
Reference in New Issue