fix(sort): set correct output row index.

This commit is contained in:
Haojun Liao 2024-05-08 16:21:48 +08:00
parent bc04093d11
commit dc9452fb0a
1 changed files with 91 additions and 79 deletions

View File

@ -38,11 +38,11 @@ typedef struct SSortMemFileRegion {
int32_t bufRegOffset; int32_t bufRegOffset;
int32_t bufLen; int32_t bufLen;
char* buf; char* buf;
} SSortMemFileRegion; } SSortMemFileRegion;
typedef struct SSortMemFile { typedef struct SSortMemFile {
char* writeBuf; char* writeBuf;
int32_t writeBufSize; int32_t writeBufSize;
int64_t writeFileOffset; int64_t writeFileOffset;
@ -55,7 +55,7 @@ typedef struct SSortMemFile {
int32_t blockSize; int32_t blockSize;
FILE* pTdFile; FILE* pTdFile;
char memFilePath[PATH_MAX]; char memFilePath[PATH_MAX];
} SSortMemFile; } SSortMemFile;
struct SSortHandle { struct SSortHandle {
@ -260,6 +260,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->cmpParam.orderInfo = pSortInfo; pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->cmpParam.cmpGroupId = false; pSortHandle->cmpParam.cmpGroupId = false;
pSortHandle->cmpParam.sortType = type; pSortHandle->cmpParam.sortType = type;
if (type == SORT_BLOCK_TS_MERGE) { if (type == SORT_BLOCK_TS_MERGE) {
SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0); SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0);
pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId; pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId;
@ -522,10 +523,9 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) { static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
if (isNull) { if (isNull) {
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true); colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
} else { } else {
@ -557,7 +557,9 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
pSource->pageIndex = -1; pSource->pageIndex = -1;
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
} else { } else {
if (pSource->pageIndex % 512 == 0) qDebug("begin source %p page %d", pSource, pSource->pageIndex); if (pSource->pageIndex % 512 == 0) {
qDebug("begin source %p page %d", pSource, pSource->pageIndex);
}
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
@ -635,7 +637,7 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa
// TODO: improve this function performance // TODO: improve this function performance
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock, int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
int32_t leftRowIndex, int32_t rightRowIndex, void* pCompareOrder) { int32_t leftRowIndex, int32_t rightRowIndex, void* pCompareOrder) {
SBlockOrderInfo* pOrder = pCompareOrder; SBlockOrderInfo* pOrder = pCompareOrder;
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
@ -680,7 +682,7 @@ int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
left1 = colDataGetData(pLeftColInfoData, leftRowIndex); left1 = colDataGetData(pLeftColInfoData, leftRowIndex);
right1 = colDataGetData(pRightColInfoData, rightRowIndex); right1 = colDataGetData(pRightColInfoData, rightRowIndex);
__compar_fn_t fn = pOrder->compFn; __compar_fn_t fn = pOrder->compFn;
int ret = fn(left1, right1); int32_t ret = fn(left1, right1);
return ret; return ret;
} }
@ -719,7 +721,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
int64_t* leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex; int64_t* leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex;
int64_t* rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex; int64_t* rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex;
int ret = pParam->cmpTsFn(leftTs, rightTs); int32_t ret = pParam->cmpTsFn(leftTs, rightTs);
if (ret == 0 && pParam->pPkOrder) { if (ret == 0 && pParam->pPkOrder) {
ret = tsortComparBlockCell(pLeftBlock, pRightBlock, ret = tsortComparBlockCell(pLeftBlock, pRightBlock,
pLeftSource->src.rowIndex, pRightSource->src.rowIndex, (SBlockOrderInfo*)pParam->pPkOrder); pLeftSource->src.rowIndex, pRightSource->src.rowIndex, (SBlockOrderInfo*)pParam->pPkOrder);
@ -782,7 +784,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
pOrder->compFn = fn; pOrder->compFn = fn;
} }
int ret = fn(left1, right1); int32_t ret = fn(left1, right1);
if (ret == 0) { if (ret == 0) {
continue; continue;
} else { } else {
@ -855,7 +857,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return code; return code;
} }
int nMergedRows = 0; int32_t nMergedRows = 0;
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) { while (1) {
@ -1075,7 +1077,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
} }
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET); taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) { if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return terrno; return terrno;
@ -1095,7 +1097,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock); memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock);
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET); taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen)); int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile); int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) { if (ret != 1) {
taosMemoryFreeClear(*ppRow); taosMemoryFreeClear(*ppRow);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
@ -1214,7 +1216,7 @@ static int32_t tsortCloseRegion(SSortHandle* pHandle) {
pRegion->regionSize = pMemFile->currRegionOffset; pRegion->regionSize = pMemFile->currRegionOffset;
int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset); int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset);
if (writeBytes > 0) { if (writeBytes > 0) {
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != 1) { if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return terrno; return terrno;
@ -1247,7 +1249,7 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p
{ {
if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) { if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) {
int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset); int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile); int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != 1) { if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return terrno; return terrno;
@ -1317,6 +1319,7 @@ static void initRowIdSort(SSortHandle* pHandle) {
blockDataAppendColInfo(pSortInput, &offsetCol); blockDataAppendColInfo(pSortInput, &offsetCol);
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4); SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
blockDataAppendColInfo(pSortInput, &lengthCol); blockDataAppendColInfo(pSortInput, &lengthCol);
if (pHandle->bSortPk) { if (pHandle->bSortPk) {
pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5); pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
blockDataAppendColInfo(pSortInput, &pkCol); blockDataAppendColInfo(pSortInput, &pkCol);
@ -1324,20 +1327,21 @@ static void initRowIdSort(SSortHandle* pHandle) {
blockDataDestroy(pHandle->pDataBlock); blockDataDestroy(pHandle->pDataBlock);
pHandle->pDataBlock = pSortInput; pHandle->pDataBlock = pSortInput;
int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock); // int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock);
size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock); // size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
pHandle->pageSize = 256 * 1024; // 256k pHandle->pageSize = 256 * 1024; // 256k
pHandle->numOfPages = 256; pHandle->numOfPages = 256;
SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo)); SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo));
int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order;
SBlockOrderInfo* pTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlockOrderInfo biTs = {0}; SBlockOrderInfo biTs = {0};
biTs.order = pTsOrder->order; biTs.order = tsOrder;
biTs.slotId = 0; biTs.slotId = 0;
biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC); biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC);
biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order); biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order);
taosArrayPush(aOrder, &biTs); taosArrayPush(pOrderInfoList, &biTs);
if (pHandle->bSortPk) { if (pHandle->bSortPk) {
SBlockOrderInfo biPk = {0}; SBlockOrderInfo biPk = {0};
@ -1345,11 +1349,11 @@ static void initRowIdSort(SSortHandle* pHandle) {
biPk.slotId = 4; biPk.slotId = 4;
biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC); biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC);
biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order); biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order);
taosArrayPush(aOrder, &biPk); taosArrayPush(pOrderInfoList, &biPk);
} }
taosArrayDestroy(pHandle->pSortInfo); taosArrayDestroy(pHandle->pSortInfo);
pHandle->pSortInfo = aOrder; pHandle->pSortInfo = pOrderInfoList;
return;
} }
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) { int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
@ -1441,8 +1445,8 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
} }
static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) { static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) {
int sz = 0; int32_t sz = 0;
int numCols = taosArrayGetSize(blk->pDataBlock); int32_t numCols = taosArrayGetSize(blk->pDataBlock);
if (!blk->info.hasVarCol) { if (!blk->info.hasVarCol) {
sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0); sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0);
sz += blockDataGetRowSize(blk); sz += blockDataGetRowSize(blk);
@ -1470,42 +1474,46 @@ static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdx
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) { static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock); int32_t pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz); int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
blockDataCleanup(pHandle->pDataBlock); blockDataCleanup(pHandle->pDataBlock);
int32_t numBlks = taosArrayGetSize(aBlk); int32_t numBlks = taosArrayGetSize(aBlk);
SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ? SBlockOrderInfo* pOrigBlockTsOrder =
taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0); (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0); SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlkMergeSupport sup = {0}; SBlkMergeSupport sup = {0};
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t)); sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*)); sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
sup.tsOrder = pOrigBlockTsOrder->order; sup.tsOrder = pOrigBlockTsOrder->order;
sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*)); sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*));
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i); for (int32_t i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId); SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId);
sup.aTs[i] = (int64_t*)col->pData; sup.aTs[i] = (int64_t*)col->pData;
sup.aRowIdx[i] = 0; sup.aRowIdx[i] = 0;
sup.aBlks[i] = blk; sup.aBlks[i] = blk;
} }
SBlockOrderInfo* pOrigBlockPkOrder = NULL; SBlockOrderInfo* pOrigBlockPkOrder = NULL;
if (pHandle->bSortPk) { if (pHandle->bSortPk) {
pOrigBlockPkOrder = (!pHandle->bSortByRowId) ? pOrigBlockPkOrder =
taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1); (!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
} }
sup.pPkOrder = pOrigBlockPkOrder; sup.pPkOrder = pOrigBlockPkOrder;
int32_t totalRows = 0; int32_t totalRows = 0;
for (int i = 0; i < numBlks; ++i) { for (int32_t i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i); SSDataBlock* blk = taosArrayGetP(aBlk, i);
totalRows += blk->info.rows; totalRows += blk->info.rows;
} }
SMultiwayMergeTreeInfo* pTree = NULL; SMultiwayMergeTreeInfo* pTree = NULL;
__merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn; __merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn); code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(sup.aRowIdx); taosMemoryFree(sup.aRowIdx);
@ -1517,50 +1525,53 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
SArray* aPgId = taosArrayInit(8, sizeof(int32_t)); SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
int32_t nRows = 0; int32_t nRows = 0;
int32_t nMergedRows = 0; int32_t nMergedRows = 0;
bool mergeLimitReached = false; bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz; size_t blkPgSz = pgHeaderSz;
int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN; int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t currTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
while (nRows < totalRows) { while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree); int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx); SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx]; int32_t minRow = sup.aRowIdx[minIdx];
SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk; SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows); int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) { if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1]; lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId); code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pTree); taosMemoryFree(pTree);
taosArrayDestroy(aPgId); taosArrayDestroy(aPgId);
taosMemoryFree(sup.aRowIdx); taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs); taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks); taosMemoryFree(sup.aBlks);
return code; return code;
}
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
} }
nMergedRows += pHandle->pDataBlock->info.rows; break;
blockDataCleanup(pHandle->pDataBlock); }
blkPgSz = pgHeaderSz;
incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
bufInc = getPageBufIncForRow(incBlock, minRow, 0);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
break;
}
} }
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1); blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
if (!pHandle->bSortByRowId) { if (!pHandle->bSortByRowId) {
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow); appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
} else { } else {
appendToRowIndexDataBlock(pHandle, minBlk, &minRow); appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
} }
blkPgSz += bufInc; blkPgSz += bufInc;
++nRows; ++nRows;
@ -1572,6 +1583,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
} }
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
} }
if (pHandle->pDataBlock->info.rows > 0) { if (pHandle->pDataBlock->info.rows > 0) {
if (!mergeLimitReached) { if (!mergeLimitReached) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId); SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
@ -1584,14 +1596,14 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
taosMemoryFree(sup.aTs); taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks); taosMemoryFree(sup.aBlks);
return code; return code;
} }
nMergedRows += pHandle->pDataBlock->info.rows; nMergedRows += pHandle->pDataBlock->info.rows;
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) { if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true; mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) || if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) { (lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs; pHandle->currMergeLimitTs = lastPageBufTs;
} }
} }
} }
blockDataCleanup(pHandle->pDataBlock); blockDataCleanup(pHandle->pDataBlock);
@ -1724,7 +1736,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc); code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i)); blockDataDestroy(taosArrayGetP(aBlkSort, i));
} }
taosArrayClear(aBlkSort); taosArrayClear(aBlkSort);
@ -1736,7 +1748,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int64_t el = taosGetTimestampUs() - p; int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el; pHandle->sortElapsed += el;
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i)); blockDataDestroy(taosArrayGetP(aBlkSort, i));
} }
taosArrayClear(aBlkSort); taosArrayClear(aBlkSort);
@ -1750,7 +1762,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (tsortIsClosed(pHandle)) { if (tsortIsClosed(pHandle)) {
tSimpleHashClear(mUidBlk); tSimpleHashClear(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i)); blockDataDestroy(taosArrayGetP(aBlkSort, i));
} }
taosArrayClear(aBlkSort); taosArrayClear(aBlkSort);
@ -1759,7 +1771,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
} }
tSimpleHashCleanup(mUidBlk); tSimpleHashCleanup(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) { for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i)); blockDataDestroy(taosArrayGetP(aBlkSort, i));
} }
taosArrayDestroy(aBlkSort); taosArrayDestroy(aBlkSort);
@ -2048,10 +2060,10 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param)
if (!lData) return pOrder->nullFirst ? -1 : 1; if (!lData) return pOrder->nullFirst ? -1 : 1;
if (!rData) return pOrder->nullFirst ? 1 : -1; if (!rData) return pOrder->nullFirst ? 1 : -1;
int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type; int32_t type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
__compar_fn_t fn = getKeyComparFunc(type, pOrder->order); __compar_fn_t fn = getKeyComparFunc(type, pOrder->order);
int ret = fn(lData, rData); int32_t ret = fn(lData, rData);
if (ret == 0) { if (ret == 0) {
continue; continue;
} else { } else {