This commit is contained in:
Hongze Cheng 2022-03-22 07:50:34 +00:00
parent 2d1e356cc2
commit 238550b0f4
3 changed files with 33 additions and 5 deletions

View File

@ -477,10 +477,15 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
int nOlds;
SPage *pOlds[3];
SCell *pDivCell[2] = {0};
int szDivCell[2];
int sIdx;
u8 childLeaf;
{ // Find 3 child pages at most to do balance
int nCells = TDB_PAGE_TOTAL_CELLS(pParent);
SCell *pCell;
if (nCells <= 2) {
sIdx = 0;
nOlds = nCells + 1;
@ -503,8 +508,6 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pParent)));
pgno = ((SIntHdr *)(pParent->pData))->pgno;
} else {
SCell *pCell;
pCell = tdbPageGetCell(pParent, sIdx + i);
pgno = *(SPgno *)pCell;
}
@ -515,6 +518,17 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
return -1;
}
}
// copy the parent key out if child pages are not leaf page
childLeaf = TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]));
if (childLeaf) {
for (int i = 0; i < nOlds - 1; i++) {
pCell = tdbPageGetCell(pParent, sIdx + i);
szDivCell[i] = tdbBtreeCellSize(pParent, pCell);
pDivCell[i] = malloc(szDivCell[i]);
memcpy(pDivCell, pCell, szDivCell[i]);
}
}
// drop the cells on parent page
for (int i = 0; i < nOlds; i++) {
nCells = TDB_PAGE_TOTAL_CELLS(pParent);
@ -541,14 +555,28 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
SPage *pPage = pOlds[oPage];
SCell *pCell;
int cellBytes;
int oIdx;
for (int oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) {
for (oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) {
pCell = tdbPageGetCell(pPage, oIdx);
cellBytes = TDB_BYTES_CELL_TAKEN(pPage, pCell);
if (infoNews[nNews].size + cellBytes > TDB_PAGE_USABLE_SIZE(pPage)) {
// page is full, use a new page
nNews++;
// for a child leaf case, this cell is used as the new divider cell
if (childLeaf) continue;
}
infoNews[nNews].cnt++;
infoNews[nNews].size += cellBytes;
infoNews[nNews].oPage = oPage;
infoNews[nNews].oIdx = oIdx;
}
// For child leaf pages
if (childLeaf && oPage < nOlds - 1) {
if (infoNews[nNews].size + szDivCell[oPage] + TDB_PAGE_OFFSET_SIZE(pPage) > TDB_PAGE_USABLE_SIZE(pPage)) {
nNews++;
}
infoNews[nNews].cnt++;
infoNews[nNews].size += cellBytes;

View File

@ -102,6 +102,7 @@ struct SPage {
#define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx)
#define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell) + (pPage)->pPageMethods->szOffset)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg);
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);

View File

@ -18,7 +18,6 @@
extern SPageMethods pageMethods;
extern SPageMethods pageLargeMethods;
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
#define TDB_PAGE_HDR_SIZE(pPage) ((pPage)->pPageMethods->szPageHdr)
#define TDB_PAGE_FREE_CELL_SIZE(pPage) ((pPage)->pPageMethods->szFreeCell)
#define TDB_PAGE_NCELLS(pPage) (*(pPage)->pPageMethods->getCellNum)(pPage)