Merge pull request #25679 from taosdata/fix/3_liaohj

fix(query): set correct row length for rowid sort.
This commit is contained in:
Haojun Liao 2024-05-09 10:10:27 +08:00 committed by GitHub
commit a450deb60b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 226 additions and 123 deletions

View File

@ -4320,6 +4320,7 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo*
int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfTable = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
@ -4339,6 +4340,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
return code;
}
}
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);

View File

@ -38,11 +38,11 @@ typedef struct SSortMemFileRegion {
int32_t bufRegOffset;
int32_t bufLen;
char* buf;
char* buf;
} SSortMemFileRegion;
typedef struct SSortMemFile {
char* writeBuf;
char* writeBuf;
int32_t writeBufSize;
int64_t writeFileOffset;
@ -55,7 +55,7 @@ typedef struct SSortMemFile {
int32_t blockSize;
FILE* pTdFile;
char memFilePath[PATH_MAX];
char memFilePath[PATH_MAX];
} SSortMemFile;
struct SSortHandle {
@ -260,6 +260,7 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->cmpParam.orderInfo = pSortInfo;
pSortHandle->cmpParam.cmpGroupId = false;
pSortHandle->cmpParam.sortType = type;
if (type == SORT_BLOCK_TS_MERGE) {
SBlockOrderInfo* pTsOrder = TARRAY_GET_ELEM(pSortInfo, 0);
pSortHandle->cmpParam.tsSlotId = pTsOrder->slotId;
@ -522,10 +523,9 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
static void appendOneRowToDataBlock(SSDataBlock* pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL);
if (isNull) {
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
} else {
@ -557,7 +557,9 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
pSource->pageIndex = -1;
pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock);
} else {
if (pSource->pageIndex % 512 == 0) qDebug("begin source %p page %d", pSource, pSource->pageIndex);
if (pSource->pageIndex % 512 == 0) {
qDebug("begin source %p page %d", pSource, pSource->pageIndex);
}
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
@ -635,7 +637,7 @@ static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparPa
// TODO: improve this function performance
int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
int32_t tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
int32_t leftRowIndex, int32_t rightRowIndex, void* pCompareOrder) {
SBlockOrderInfo* pOrder = pCompareOrder;
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
@ -680,7 +682,7 @@ int tsortComparBlockCell(SSDataBlock* pLeftBlock, SSDataBlock* pRightBlock,
left1 = colDataGetData(pLeftColInfoData, leftRowIndex);
right1 = colDataGetData(pRightColInfoData, rightRowIndex);
__compar_fn_t fn = pOrder->compFn;
int ret = fn(left1, right1);
int32_t ret = fn(left1, right1);
return ret;
}
@ -719,7 +721,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
int64_t* leftTs = (int64_t*)(pLeftTsCol->pData) + pLeftSource->src.rowIndex;
int64_t* rightTs = (int64_t*)(pRightTsCol->pData) + pRightSource->src.rowIndex;
int ret = pParam->cmpTsFn(leftTs, rightTs);
int32_t ret = pParam->cmpTsFn(leftTs, rightTs);
if (ret == 0 && pParam->pPkOrder) {
ret = tsortComparBlockCell(pLeftBlock, pRightBlock,
pLeftSource->src.rowIndex, pRightSource->src.rowIndex, (SBlockOrderInfo*)pParam->pPkOrder);
@ -782,7 +784,7 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
pOrder->compFn = fn;
}
int ret = fn(left1, right1);
int32_t ret = fn(left1, right1);
if (ret == 0) {
continue;
} else {
@ -855,7 +857,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
return code;
}
int nMergedRows = 0;
int32_t nMergedRows = 0;
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) {
@ -1075,7 +1077,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
}
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize);
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
@ -1095,7 +1097,7 @@ static int32_t getRowBufFromExtMemFile(SSortHandle* pHandle, int32_t regionId, i
memcpy(*ppRow, pRegion->buf + tupleOffset - pRegion->bufRegOffset, szThisBlock);
taosSeekCFile(pMemFile->pTdFile, pRegion->fileOffset + pRegion->bufRegOffset + pRegion->bufLen, SEEK_SET);
int32_t readBytes = TMIN(pMemFile->blockSize, pRegion->regionSize - (pRegion->bufRegOffset + pRegion->bufLen));
int ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
int32_t ret = taosReadFromCFile(pRegion->buf, readBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
taosMemoryFreeClear(*ppRow);
terrno = TAOS_SYSTEM_ERROR(errno);
@ -1214,7 +1216,7 @@ static int32_t tsortCloseRegion(SSortHandle* pHandle) {
pRegion->regionSize = pMemFile->currRegionOffset;
int32_t writeBytes = pRegion->regionSize - (pMemFile->writeFileOffset - pRegion->fileOffset);
if (writeBytes > 0) {
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
@ -1241,13 +1243,15 @@ static int32_t tsortFinalizeRegions(SSortHandle* pHandle) {
return TSDB_CODE_SUCCESS;
}
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* pBlock, int32_t rowIdx,
int32_t* pRegionId, int32_t* pOffset, int32_t* pLength) {
SSortMemFile* pMemFile = pHandle->pExtRowsMemFile;
SSortMemFileRegion* pRegion = taosArrayGet(pMemFile->aFileRegions, pMemFile->currRegionId);
{
if (pMemFile->currRegionOffset + pHandle->extRowBytes >= pMemFile->writeBufSize) {
int32_t writeBytes = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
int ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
int32_t ret = fwrite(pMemFile->writeBuf, writeBytes, 1, pMemFile->pTdFile);
if (ret != 1) {
terrno = TAOS_SYSTEM_ERROR(errno);
return terrno;
@ -1255,11 +1259,13 @@ static int32_t saveBlockRowToExtRowsMemFile(SSortHandle* pHandle, SSDataBlock* p
pMemFile->writeFileOffset = pRegion->fileOffset + pMemFile->currRegionOffset;
}
}
*pRegionId = pMemFile->currRegionId;
*pOffset = pMemFile->currRegionOffset;
int32_t writeBufOffset = pMemFile->currRegionOffset - (pMemFile->writeFileOffset - pRegion->fileOffset);
int32_t blockLen = blockRowToBuf(pBlock, rowIdx, pMemFile->writeBuf + writeBufOffset);
*pLength = blockLen;
pMemFile->currRegionOffset += blockLen;
pMemFile->bRegionDirty = true;
return TSDB_CODE_SUCCESS;
@ -1317,27 +1323,30 @@ static void initRowIdSort(SSortHandle* pHandle) {
blockDataAppendColInfo(pSortInput, &offsetCol);
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 4);
blockDataAppendColInfo(pSortInput, &lengthCol);
if (pHandle->bSortPk) {
pkCol = createColumnInfoData(extPkCol->info.type, extPkCol->info.bytes, 5);
blockDataAppendColInfo(pSortInput, &pkCol);
}
blockDataDestroy(pHandle->pDataBlock);
pHandle->pDataBlock = pSortInput;
int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock);
size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
// int32_t rowSize = blockDataGetRowSize(pHandle->pDataBlock);
// size_t nCols = taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
pHandle->pageSize = 256 * 1024; // 256k
pHandle->numOfPages = 256;
SArray* aOrder = taosArrayInit(1, sizeof(SBlockOrderInfo));
SArray* pOrderInfoList = taosArrayInit(1, sizeof(SBlockOrderInfo));
int32_t tsOrder = ((SBlockOrderInfo*)taosArrayGet(pHandle->pSortInfo, 0))->order;
SBlockOrderInfo* pTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlockOrderInfo biTs = {0};
biTs.order = pTsOrder->order;
biTs.order = tsOrder;
biTs.slotId = 0;
biTs.nullFirst = (biTs.order == TSDB_ORDER_ASC);
biTs.compFn = getKeyComparFunc(TSDB_DATA_TYPE_TIMESTAMP, biTs.order);
taosArrayPush(aOrder, &biTs);
taosArrayPush(pOrderInfoList, &biTs);
if (pHandle->bSortPk) {
SBlockOrderInfo biPk = {0};
@ -1345,11 +1354,11 @@ static void initRowIdSort(SSortHandle* pHandle) {
biPk.slotId = 4;
biPk.nullFirst = (biPk.order == TSDB_ORDER_ASC);
biPk.compFn = getKeyComparFunc(pkCol.info.type, biPk.order);
taosArrayPush(aOrder, &biPk);
taosArrayPush(pOrderInfoList, &biPk);
}
taosArrayDestroy(pHandle->pSortInfo);
pHandle->pSortInfo = aOrder;
return;
pHandle->pSortInfo = pOrderInfoList;
}
int32_t tsortSetSortByRowId(SSortHandle* pHandle, int32_t extRowsMemSize) {
@ -1440,128 +1449,224 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
return 0;
}
static int32_t getPageBufIncForRow(SSDataBlock* blk, int32_t row, int32_t rowIdxInPage) {
int sz = 0;
int numCols = taosArrayGetSize(blk->pDataBlock);
if (!blk->info.hasVarCol) {
sz += numCols * ((rowIdxInPage & 0x7) == 0 ? 1: 0);
sz += blockDataGetRowSize(blk);
static int32_t getPageBufIncForRow(SSDataBlock* pSrcBlock, int32_t srcRowIndex, int32_t dstRowIndex) {
int32_t size = 0;
int32_t numCols = taosArrayGetSize(pSrcBlock->pDataBlock);
if (!pSrcBlock->info.hasVarCol) {
size += numCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
size += blockDataGetRowSize(pSrcBlock);
} else {
for (int32_t i = 0; i < numCols; ++i) {
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(blk->pDataBlock, i);
SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pSrcBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
if ((pColInfoData->varmeta.offset[row] != -1) && (pColInfoData->pData)) {
char* p = colDataGetData(pColInfoData, row);
sz += varDataTLen(p);
if ((pColInfoData->varmeta.offset[srcRowIndex] != -1) && (pColInfoData->pData)) {
char* p = colDataGetData(pColInfoData, srcRowIndex);
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
size += getJsonValueLen(p);
} else {
size += varDataTLen(p);
}
}
sz += sizeof(pColInfoData->varmeta.offset[0]);
size += sizeof(pColInfoData->varmeta.offset[0]);
} else {
sz += pColInfoData->info.bytes;
size += pColInfoData->info.bytes;
if (((rowIdxInPage) & 0x07) == 0) {
sz += 1; // bitmap
if (((dstRowIndex) & 0x07) == 0) {
size += 1; // bitmap
}
}
}
}
return sz;
return size;
}
static int32_t getPageBufIncForRowIdSort(SSDataBlock* pDstBlock, int32_t srcRowIndex, int32_t dstRowIndex,
SColumnInfoData* pPkCol) {
int32_t size = 0;
int32_t numOfCols = blockDataGetNumOfCols(pDstBlock);
if (pPkCol == NULL) { // no var column
ASSERT((numOfCols == 4) && (!pDstBlock->info.hasVarCol));
size += numOfCols * ((dstRowIndex & 0x7) == 0 ? 1: 0);
size += blockDataGetRowSize(pDstBlock);
} else {
ASSERT(numOfCols == 5);
size += (numOfCols - 1) * (((dstRowIndex & 0x7) == 0)? 1:0);
for(int32_t i = 0; i < numOfCols - 1; ++i) {
SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pDstBlock->pDataBlock, i);
size += pColInfo->info.bytes;
}
// handle the pk column, the last column, may be the var char column
if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
if ((pPkCol->varmeta.offset[srcRowIndex] != -1) && (pPkCol->pData)) {
char* p = colDataGetData(pPkCol, srcRowIndex);
size += varDataTLen(p);
}
size += sizeof(pPkCol->varmeta.offset[0]);
} else {
size += pPkCol->info.bytes;
if (((dstRowIndex) & 0x07) == 0) {
size += 1; // bitmap
}
}
}
return size;
}
static int32_t getBufIncForNewRow(SSortHandle* pHandle, int32_t dstRowIndex, SSDataBlock* pSrcBlock,
int32_t srcRowIndex) {
int32_t inc = 0;
if (pHandle->bSortByRowId) {
SColumnInfoData* pPkCol = NULL;
// there may be varchar column exists, so we need to get the pk info, and then calculate the row length
if (pHandle->bSortPk) {
SBlockOrderInfo* extRowsPkOrder = taosArrayGet(pHandle->aExtRowsOrders, 1);
pPkCol = taosArrayGet(pSrcBlock->pDataBlock, extRowsPkOrder->slotId);
}
inc = getPageBufIncForRowIdSort(pHandle->pDataBlock, srcRowIndex, dstRowIndex, pPkCol);
} else {
inc = getPageBufIncForRow(pSrcBlock, srcRowIndex, dstRowIndex);
}
return inc;
}
static int32_t initMergeSup(SBlkMergeSupport* pSup, SArray* pBlockList, int32_t tsOrder, int32_t tsSlotId, SBlockOrderInfo* pPkOrderInfo) {
memset(pSup, 0, sizeof(SBlkMergeSupport));
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
pSup->aRowIdx = taosMemoryCalloc(numOfBlocks, sizeof(int32_t));
pSup->aTs = taosMemoryCalloc(numOfBlocks, sizeof(int64_t*));
pSup->tsOrder = tsOrder;
pSup->aBlks = taosMemoryCalloc(numOfBlocks, sizeof(SSDataBlock*));
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pBlock = taosArrayGetP(pBlockList, i);
SColumnInfoData* col = taosArrayGet(pBlock->pDataBlock, tsSlotId);
pSup->aTs[i] = (int64_t*)col->pData;
pSup->aRowIdx[i] = 0;
pSup->aBlks[i] = pBlock;
}
pSup->pPkOrder = pPkOrderInfo;
return TSDB_CODE_SUCCESS;
}
static void cleanupMergeSup(SBlkMergeSupport* pSup) {
taosMemoryFree(pSup->aRowIdx);
taosMemoryFree(pSup->aTs);
taosMemoryFree(pSup->aBlks);
}
static int32_t getTotalRows(SArray* pBlockList) {
int32_t totalRows = 0;
for (int32_t i = 0; i < taosArrayGetSize(pBlockList); ++i) {
SSDataBlock* blk = taosArrayGetP(pBlockList, i);
totalRows += blk->info.rows;
}
return totalRows;
}
static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* aExtSrc) {
int32_t code = TSDB_CODE_SUCCESS;
int pgHeaderSz = sizeof(int32_t) + sizeof(int32_t) * taosArrayGetSize(pHandle->pDataBlock->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pgHeaderSz);
int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize);
blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
blockDataCleanup(pHandle->pDataBlock);
int32_t numBlks = taosArrayGetSize(aBlk);
SBlockOrderInfo* pOrigBlockTsOrder = (!pHandle->bSortByRowId) ?
taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlkMergeSupport sup = {0};
sup.aRowIdx = taosMemoryCalloc(numBlks, sizeof(int32_t));
sup.aTs = taosMemoryCalloc(numBlks, sizeof(int64_t*));
sup.tsOrder = pOrigBlockTsOrder->order;
sup.aBlks = taosMemoryCalloc(numBlks, sizeof(SSDataBlock*));
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
SColumnInfoData* col = taosArrayGet(blk->pDataBlock, pOrigBlockTsOrder->slotId);
sup.aTs[i] = (int64_t*)col->pData;
sup.aRowIdx[i] = 0;
sup.aBlks[i] = blk;
}
SBlockOrderInfo* pOrigBlockTsOrder =
(!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 0) : taosArrayGet(pHandle->aExtRowsOrders, 0);
SBlockOrderInfo* pHandleBlockTsOrder = taosArrayGet(pHandle->pSortInfo, 0);
SBlockOrderInfo* pOrigBlockPkOrder = NULL;
if (pHandle->bSortPk) {
pOrigBlockPkOrder = (!pHandle->bSortByRowId) ?
taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
pOrigBlockPkOrder =
(!pHandle->bSortByRowId) ? taosArrayGet(pHandle->pSortInfo, 1) : taosArrayGet(pHandle->aExtRowsOrders, 1);
}
sup.pPkOrder = pOrigBlockPkOrder;
int32_t totalRows = 0;
for (int i = 0; i < numBlks; ++i) {
SSDataBlock* blk = taosArrayGetP(aBlk, i);
totalRows += blk->info.rows;
}
initMergeSup(&sup, aBlk, pOrigBlockTsOrder->order, pOrigBlockTsOrder->slotId, pOrigBlockPkOrder);
int32_t totalRows = getTotalRows(aBlk);
SMultiwayMergeTreeInfo* pTree = NULL;
__merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
__merge_compare_fn_t mergeCompareFn = (!pHandle->bSortPk) ? blockCompareTsFn : blockCompareTsPkFn;
code = tMergeTreeCreate(&pTree, taosArrayGetSize(aBlk), &sup, mergeCompareFn);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
return code;
}
SArray* aPgId = taosArrayInit(8, sizeof(int32_t));
int32_t nRows = 0;
int32_t nMergedRows = 0;
bool mergeLimitReached = false;
size_t blkPgSz = pgHeaderSz;
bool mergeLimitReached = false;
size_t blkPgSz = pageHeaderSize;
int64_t lastPageBufTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
int64_t currTs = (pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ? INT64_MAX : INT64_MIN;
while (nRows < totalRows) {
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
int32_t minIdx = tMergeTreeGetChosenIndex(pTree);
SSDataBlock* minBlk = taosArrayGetP(aBlk, minIdx);
int32_t minRow = sup.aRowIdx[minIdx];
SSDataBlock* incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
int32_t bufInc = getPageBufIncForRow(incBlock, minRow, pHandle->pDataBlock->info.rows);
int32_t minRow = sup.aRowIdx[minIdx];
int32_t bufInc = getBufIncForNewRow(pHandle, pHandle->pDataBlock->info.rows, minBlk, minRow);
if (blkPgSz <= pHandle->pageSize && blkPgSz + bufInc > pHandle->pageSize) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pTree);
taosArrayDestroy(aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
return code;
}
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pgHeaderSz;
incBlock = (pHandle->bSortByRowId) ? pHandle->pDataBlock : minBlk;
bufInc = getPageBufIncForRow(incBlock, minRow, 0);
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
lastPageBufTs = ((int64_t*)tsCol->pData)[pHandle->pDataBlock->info.rows - 1];
code = appendDataBlockToPageBuf(pHandle, pHandle->pDataBlock, aPgId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pTree);
taosArrayDestroy(aPgId);
cleanupMergeSup(&sup);
return code;
}
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
break;
nMergedRows += pHandle->pDataBlock->info.rows;
blockDataCleanup(pHandle->pDataBlock);
blkPgSz = pageHeaderSize;
bufInc = getBufIncForNewRow(pHandle, 0, minBlk, minRow);
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
break;
}
}
blockDataEnsureCapacity(pHandle->pDataBlock, pHandle->pDataBlock->info.rows + 1);
if (!pHandle->bSortByRowId) {
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
if (pHandle->bSortByRowId) {
appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
} else {
appendToRowIndexDataBlock(pHandle, minBlk, &minRow);
appendOneRowToDataBlock(pHandle->pDataBlock, minBlk, &minRow);
}
blkPgSz += bufInc;
ASSERT(blkPgSz == blockDataGetSize(pHandle->pDataBlock) + pageHeaderSize);
++nRows;
@ -1572,6 +1677,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
}
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
}
if (pHandle->pDataBlock->info.rows > 0) {
if (!mergeLimitReached) {
SColumnInfoData* tsCol = taosArrayGet(pHandle->pDataBlock->pDataBlock, pHandleBlockTsOrder->slotId);
@ -1580,18 +1686,16 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(aPgId);
taosMemoryFree(pTree);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
return code;
}
nMergedRows += pHandle->pDataBlock->info.rows;
if ((pHandle->mergeLimit != -1) && (nMergedRows >= pHandle->mergeLimit)) {
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
mergeLimitReached = true;
if ((lastPageBufTs < pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_ASC) ||
(lastPageBufTs > pHandle->currMergeLimitTs && pHandleBlockTsOrder->order == TSDB_ORDER_DESC)) {
pHandle->currMergeLimitTs = lastPageBufTs;
}
}
}
blockDataCleanup(pHandle->pDataBlock);
@ -1600,10 +1704,7 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
SSDataBlock* pMemSrcBlk = createOneDataBlock(pHandle->pDataBlock, false);
doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
taosMemoryFree(sup.aRowIdx);
taosMemoryFree(sup.aTs);
taosMemoryFree(sup.aBlks);
cleanupMergeSup(&sup);
tMergeTreeDestroy(&pTree);
return 0;
@ -1724,7 +1825,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
code = sortBlocksToExtSource(pHandle, aBlkSort, aExtSrc);
if (code != TSDB_CODE_SUCCESS) {
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
@ -1736,7 +1837,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
@ -1750,7 +1851,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (tsortIsClosed(pHandle)) {
tSimpleHashClear(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
@ -1759,7 +1860,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
tSimpleHashCleanup(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
for (int32_t i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayDestroy(aBlkSort);
@ -2048,10 +2149,10 @@ static int32_t tupleComparFn(const void* pLeft, const void* pRight, void* param)
if (!lData) return pOrder->nullFirst ? -1 : 1;
if (!rData) return pOrder->nullFirst ? 1 : -1;
int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
int32_t type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
__compar_fn_t fn = getKeyComparFunc(type, pOrder->order);
int ret = fn(lData, rData);
int32_t ret = fn(lData, rData);
if (ret == 0) {
continue;
} else {

View File

@ -1734,7 +1734,7 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
bool hasFillhistoryTask = false;
STaskId hId = {0};
stDebug("vgId:%d add failed task:0x%x", pMeta->vgId, taskId);
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);