fix: use pageid, offset, length as row index

This commit is contained in:
shenglian zhou 2023-12-13 11:38:14 +08:00
parent 3b1a185949
commit 7f93cb9f1a
2 changed files with 124 additions and 76 deletions

View File

@ -274,8 +274,7 @@ typedef struct STableScanInfo {
} STableScanInfo;
typedef struct STmsSortRowIdInfo {
SDiskbasedBuf* pExtSrcBlkBuf;
SSHashObj* pBlkDataHash; // blkId->SSDataBlock*
SDiskbasedBuf* pExtSrcRowsBuf;
} STmsSortRowIdInfo;
typedef struct STableMergeScanInfo {

View File

@ -3220,6 +3220,80 @@ _error:
}
// ========================= table merge scan
static int32_t saveBlockRowToBuf(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, int32_t rowIdx, int32_t* pPageId, int32_t* pOffset, int32_t* pLength) {
SDiskbasedBuf* pResultBuf = pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf;
int32_t rowBytes = blockDataGetRowSize(pBlock);
SFilePage* pFilePage = NULL;
// in the first scan, new space needed for results
int32_t pageId = -1;
SArray* list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) {
pFilePage = getNewBufPage(pResultBuf, &pageId);
pFilePage->num = sizeof(SFilePage);
} else {
SPageInfo* pi = getLastPageInfo(list);
pFilePage = getBufPage(pResultBuf, getPageId(pi));
if (pFilePage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pageId = getPageId(pi);
if (pFilePage->num + rowBytes > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi);
pFilePage = getNewBufPage(pResultBuf, &pageId);
if (pFilePage != NULL) {
pFilePage->num = sizeof(SFilePage);
}
}
}
if (pFilePage == NULL) {
return -1;
}
*pPageId = pageId;
*pOffset = pFilePage->num;
char* buf = (char*)pFilePage + (*pOffset);
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* isNull = (char*)buf;
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (colDataIsNull_s(pCol, rowIdx)) {
isNull[i] = 1;
continue;
}
isNull[i] = 0;
char* pData = colDataGetData(pCol, rowIdx);
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(pData);
memcpy(pStart, pData, dataLen);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pCol->info.type)) {
varDataCopy(pStart, pData);
pStart += varDataTLen(pData);
} else {
int32_t bytes = pCol->info.bytes;
memcpy(pStart, pData, bytes);
pStart += bytes;
}
}
*pLength = (int32_t)(pStart - (char*)buf);
pFilePage->num += (*pLength);
setBufPageDirty(pFilePage, true);
releaseBufPage(pResultBuf, pFilePage);
return 0;
}
static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) {
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
@ -3233,33 +3307,18 @@ static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlo
SColumnInfoData* pSrcTsCol = taosArrayGet(pSrcBlock->pDataBlock, tsSlotId);
colDataAssign(tsCol, pSrcTsCol, nRows, &pSortInputBlk->info);
SColumnInfoData* blkIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1);
SColumnInfoData* rowIdxCol = taosArrayGet(pSortInputBlk->pDataBlock, 2);
int32_t start = 0;
while (start < pSrcBlock->info.rows) {
int32_t stop = 0;
blockDataSplitRows(pSrcBlock, pSrcBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize);
SSDataBlock* p = blockDataExtractBlock(pSrcBlock, start, stop-start+1);
SColumnInfoData* pageIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1);
SColumnInfoData* offsetCol = taosArrayGet(pSortInputBlk->pDataBlock, 2);
SColumnInfoData* lengthCol = taosArrayGet(pSortInputBlk->pDataBlock, 3);
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
int32_t pageId = -1;
void* pPage = getNewBufPage(pSortInfo->pExtSrcBlkBuf, &pageId);
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pSortInfo->pExtSrcBlkBuf));
blockDataToBuf(pPage, p);
setBufPageDirty(pPage, true);
releaseBufPage(pSortInfo->pExtSrcBlkBuf, pPage);
blockDataDestroy(p);
for (int32_t i = start; i <= stop; ++i) {
colDataSetInt32(blkIdCol, i, &pageId);
int32_t rowIdx = i - start;
colDataSetInt32(rowIdxCol, i, &rowIdx);
}
start = stop + 1;
int32_t offset = -1;
int32_t length = -1;
saveBlockRowToBuf(pInfo, pSrcBlock, i, &pageId, &offset, &length);
colDataSetInt32(pageIdCol, i, &pageId);
colDataSetInt32(pageIdCol, i, &offset);
colDataSetInt32(pageIdCol, i, &length);
}
pSortInputBlk->info.rows = nRows;
@ -3267,50 +3326,38 @@ static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlo
return 0;
}
void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
int32_t pageId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
int32_t offset = *(int32_t*)tsortGetValue(pTupleHandle, 2);
int32_t length = *(int32_t*)tsortGetValue(pTupleHandle, 2);
void* page = getBufPage(pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf, pageId);
static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId, SSDataBlock** ppBlock) {
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
void* pBlkHashVal = tSimpleHashGet(pSortInfo->pBlkDataHash, &blockId, sizeof(blockId));
if (pBlkHashVal) {
*ppBlock = *(SSDataBlock**)pBlkHashVal;
}
else {
void* pPage = getBufPage(pSortInfo->pExtSrcBlkBuf, blockId);
SSDataBlock* pBlock = createOneDataBlock(pInfo->pReaderBlock, false);
blockDataFromBuf(pBlock, pPage);
releaseBufPage(pSortInfo->pExtSrcBlkBuf, pPage);
*ppBlock = pBlock;
tSimpleHashPut(pSortInfo->pBlkDataHash, &blockId, sizeof(blockId), &pBlock, sizeof(pBlock));
}
return 0;
}
void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
int32_t blkId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
int32_t rowIdx = *(int32_t*)tsortGetValue(pTupleHandle, 2);
SSDataBlock* pSrcBlk = NULL;
retrieveSourceBlock(pInfo, blkId, &pSrcBlk);
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
char* buf = (char*)page + offset;
char* isNull = (char*)buf;
char* pStart = (char*)buf + sizeof(int8_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pSrcBlk->pDataBlock, i);
bool isNull = colDataIsNull_s(pSrcColInfo, rowIdx);
if (isNull) {
colDataSetNULL(pColInfo, pBlock->info.rows);
} else {
char* pData = colDataGetData(pSrcColInfo, rowIdx);
if (pData != NULL) {
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
if (!isNull[i]) {
colDataSetVal(pColInfo, pBlock->info.rows, pStart, false);
if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(pStart);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
pStart += varDataTLen(pStart);
} else {
int32_t bytes = pColInfo->info.bytes;
pStart += bytes;
}
} else {
colDataSetNULL(pColInfo, pBlock->info.rows);
}
}
if (rowIdx == pSrcBlk->info.rows - 1) {
tSimpleHashRemove(pInfo->tmsSortRowIdInfo.pBlkDataHash, &blkId, sizeof(blkId));
blockDataDestroy(pSrcBlk);
}
releaseBufPage(pInfo->tmsSortRowIdInfo.pExtSrcRowsBuf, page);
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
pBlock->info.rows += 1;
@ -3462,19 +3509,19 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo*
int32_t startRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo;
createDiskbasedBuf(&pSort->pExtSrcBlkBuf, pInfo->bufPageSize, pInfo->sortBufSize, "tms-ext-src-block", tsTempDir);
dBufSetPrintInfo(pSort->pExtSrcBlkBuf);
pSort->pBlkDataHash = tSimpleHashInit(2048, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
int32_t pageSize = getProperSortPageSize(blockDataGetRowSize(pInfo->pResBlock),
taosArrayGetSize(pInfo->pResBlock->pDataBlock));
int32_t memSize = pageSize * 2048;
createDiskbasedBuf(&pSort->pExtSrcRowsBuf, pageSize, memSize, "tms-ext-src-block", tsTempDir);
dBufSetPrintInfo(pSort->pExtSrcRowsBuf);
return 0;
}
int32_t stopRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo;
destroyDiskbasedBuf(pSort->pExtSrcBlkBuf);
pSort->pExtSrcBlkBuf = NULL;
tSimpleHashCleanup(pSort->pBlkDataHash);
pSort->pBlkDataHash = NULL;
destroyDiskbasedBuf(pSort->pExtSrcRowsBuf);
pSort->pExtSrcRowsBuf = NULL;
return 0;
}
@ -3798,10 +3845,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
SSDataBlock* pSortInput = createDataBlock();
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
blockDataAppendColInfo(pSortInput, &tsCol);
SColumnInfoData blkIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
blockDataAppendColInfo(pSortInput, &blkIdCol);
SColumnInfoData rowIdxCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &rowIdxCol);
SColumnInfoData pageIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
blockDataAppendColInfo(pSortInput, &pageIdCol);
SColumnInfoData offsetCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &offsetCol);
SColumnInfoData lengthCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &lengthCol);
pInfo->pSortInputBlock = pSortInput;
SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
@ -3823,8 +3872,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
}
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize;
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
int32_t rowSize = pInfo->pSortInputBlock->info.rowSize;
uint32_t nCols = taosArrayGetSize(pInfo->pSortInputBlock->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;