From 238550b0f4d0ceeb987ad41eb7992afba994dacd Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 22 Mar 2022 07:50:34 +0000 Subject: [PATCH] more TDB --- source/libs/tdb/src/db/tdbBtree.c | 36 ++++++++++++++++++++++++++---- source/libs/tdb/src/inc/tdbPage.h | 1 + source/libs/tdb/src/page/tdbPage.c | 1 - 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index e7abbc7161..d1188ee5b3 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -477,10 +477,15 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { int nOlds; SPage *pOlds[3]; + SCell *pDivCell[2] = {0}; + int szDivCell[2]; int sIdx; + u8 childLeaf; { // Find 3 child pages at most to do balance - int nCells = TDB_PAGE_TOTAL_CELLS(pParent); + int nCells = TDB_PAGE_TOTAL_CELLS(pParent); + SCell *pCell; + if (nCells <= 2) { sIdx = 0; nOlds = nCells + 1; @@ -503,8 +508,6 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { ASSERT(!TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pParent))); pgno = ((SIntHdr *)(pParent->pData))->pgno; } else { - SCell *pCell; - pCell = tdbPageGetCell(pParent, sIdx + i); pgno = *(SPgno *)pCell; } @@ -515,6 +518,17 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { return -1; } } + // copy the parent key out if child pages are not leaf page + childLeaf = TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pOlds[0])); + if (childLeaf) { + for (int i = 0; i < nOlds - 1; i++) { + pCell = tdbPageGetCell(pParent, sIdx + i); + + szDivCell[i] = tdbBtreeCellSize(pParent, pCell); + pDivCell[i] = malloc(szDivCell[i]); + memcpy(pDivCell, pCell, szDivCell[i]); + } + } // drop the cells on parent page for (int i = 0; i < nOlds; i++) { nCells = TDB_PAGE_TOTAL_CELLS(pParent); @@ -541,14 +555,28 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { SPage *pPage = pOlds[oPage]; SCell *pCell; int cellBytes; + int oIdx; - for (int oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) { + for (oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) { pCell = tdbPageGetCell(pPage, oIdx); cellBytes = TDB_BYTES_CELL_TAKEN(pPage, pCell); if (infoNews[nNews].size + cellBytes > TDB_PAGE_USABLE_SIZE(pPage)) { // page is full, use a new page nNews++; + // for a child leaf case, this cell is used as the new divider cell + if (childLeaf) continue; + } + infoNews[nNews].cnt++; + infoNews[nNews].size += cellBytes; + infoNews[nNews].oPage = oPage; + infoNews[nNews].oIdx = oIdx; + } + + // For child leaf pages + if (childLeaf && oPage < nOlds - 1) { + if (infoNews[nNews].size + szDivCell[oPage] + TDB_PAGE_OFFSET_SIZE(pPage) > TDB_PAGE_USABLE_SIZE(pPage)) { + nNews++; } infoNews[nNews].cnt++; infoNews[nNews].size += cellBytes; diff --git a/source/libs/tdb/src/inc/tdbPage.h b/source/libs/tdb/src/inc/tdbPage.h index 61927fabf6..0ba7cf236b 100644 --- a/source/libs/tdb/src/inc/tdbPage.h +++ b/source/libs/tdb/src/inc/tdbPage.h @@ -102,6 +102,7 @@ struct SPage { #define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx) #define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno) #define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell) + (pPage)->pPageMethods->szOffset) +#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset) int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg); int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg); diff --git a/source/libs/tdb/src/page/tdbPage.c b/source/libs/tdb/src/page/tdbPage.c index 16130ba13f..14960de45d 100644 --- a/source/libs/tdb/src/page/tdbPage.c +++ b/source/libs/tdb/src/page/tdbPage.c @@ -18,7 +18,6 @@ extern SPageMethods pageMethods; extern SPageMethods pageLargeMethods; -#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset) #define TDB_PAGE_HDR_SIZE(pPage) ((pPage)->pPageMethods->szPageHdr) #define TDB_PAGE_FREE_CELL_SIZE(pPage) ((pPage)->pPageMethods->szFreeCell) #define TDB_PAGE_NCELLS(pPage) (*(pPage)->pPageMethods->getCellNum)(pPage)