diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 4e4b2275f8..b801002822 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -41,8 +41,8 @@ struct SBTree { #define TDB_BTREE_PAGE_COMMON_HDR u8 flags; -#define TDB_BTREE_PAGE_GET_FLAGS(PAGE) (PAGE)->pAmHdr[0] -#define TDB_BTREE_PAGE_SET_FLAGS(PAGE, flags) ((PAGE)->pAmHdr[0] = (flags)) +#define TDB_BTREE_PAGE_GET_FLAGS(PAGE) (PAGE)->pData[0] +#define TDB_BTREE_PAGE_SET_FLAGS(PAGE, flags) ((PAGE)->pData[0] = (flags)) typedef struct __attribute__((__packed__)) { TDB_BTREE_PAGE_COMMON_HDR @@ -393,8 +393,8 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg) { u8 flags; u8 isLeaf; - pBt = ((SBtreeInitPageArg *)arg)->pBt; - flags = ((SBtreeInitPageArg *)arg)->flags; + pBt = (SBTree *)arg; + flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags); ASSERT(flags == TDB_BTREE_PAGE_GET_FLAGS(pPage)); @@ -430,7 +430,7 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) { tdbPageZero(pPage, isLeaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize); if (isLeaf) { - SLeafHdr *pLeafHdr = (SLeafHdr *)(pPage->pAmHdr); + SLeafHdr *pLeafHdr = (SLeafHdr *)(pPage->pData); pLeafHdr->flags = flags; pPage->kLen = pBt->keyLen; @@ -438,7 +438,7 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) { pPage->maxLocal = pBt->maxLeaf; pPage->minLocal = pBt->minLeaf; } else { - SIntHdr *pIntHdr = (SIntHdr *)(pPage->pAmHdr); + SIntHdr *pIntHdr = (SIntHdr *)(pPage->pData); pIntHdr->flags = flags; pIntHdr->pgno = 0; @@ -491,7 +491,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { return -1; } - pIntHdr = (SIntHdr *)(pRoot->pAmHdr); + pIntHdr = (SIntHdr *)(pRoot->pData); pIntHdr->pgno = pgnoChild; *ppChild = pChild; @@ -643,15 +643,59 @@ static int tdbBtreeBalanceStep6(SBtreeBalanceHelper *pBlh) { } static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { - int ret; + int ret; +#if 0 SBtreeBalanceHelper blh; - // ASSERT(!TDB_BTREE_PGE_IS_LEAF(TDB_PAGE_FLAGS(pParent))); - blh.pBt = pBt; blh.pParent = pParent; blh.idx = idx; +#endif + int nOlds; + SPage *pOlds[3]; + + { + // Find 3 child pages at most to do balance + int nCells = TDB_PAGE_TOTAL_CELLS(pParent); + int sIdx; + if (nCells <= 2) { + sIdx = 0; + nOlds = nCells + 1; + } else { + // has more than three child pages + if (idx == 0) { + sIdx = 0; + } else if (idx == nCells) { + sIdx = idx - 2; + } else { + sIdx = idx - 1; + } + nOlds = 3; + } + for (int i = 0; i < nOlds; i++, sIdx++) { + ASSERT(sIdx <= nCells); + + SPgno pgno; + if (sIdx == nCells) { + ASSERT(!TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pParent))); + pgno = ((SIntHdr *)(pParent->pData))->pgno; + } else { + SCell *pCell; + + pCell = tdbPageGetCell(pParent, sIdx); + pgno = *(SPgno *)pCell; + } + + ret = tdbPagerFetchPage(pBt->pPager, pgno, pOlds + i, tdbBtreeInitPage, pBt); + if (ret < 0) { + ASSERT(0); + return -1; + } + } + } + +#if 0 // Step 1: find two sibling pages and get engough info about the old pages ret = tdbBtreeBalanceStep1(&blh); if (ret < 0) { @@ -693,6 +737,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { ASSERT(0); return -1; } +#endif { // TODO: Reset states diff --git a/source/libs/tdb/src/inc/tdbPage.h b/source/libs/tdb/src/inc/tdbPage.h index 00322067df..e513a63fe4 100644 --- a/source/libs/tdb/src/inc/tdbPage.h +++ b/source/libs/tdb/src/inc/tdbPage.h @@ -58,7 +58,6 @@ struct SPage { u8 *pData; SPageMethods *pPageMethods; // Fields below used by pager and am - u8 *pAmHdr; u8 *pPageHdr; u8 *pCellIdx; u8 *pFreeStart; @@ -114,7 +113,7 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) { int iOvfl; int lidx; - ASSERT(idx >= 0 && idx < pPage->nOverflow + pPage->pPageMethods->getCellNum(pPage)); + ASSERT(idx >= 0 && idx < TDB_PAGE_TOTAL_CELLS(pPage)); iOvfl = 0; for (; iOvfl < pPage->nOverflow; iOvfl++) { diff --git a/source/libs/tdb/src/page/tdbPage.c b/source/libs/tdb/src/page/tdbPage.c index bc00da9381..0dc7853940 100644 --- a/source/libs/tdb/src/page/tdbPage.c +++ b/source/libs/tdb/src/page/tdbPage.c @@ -80,8 +80,7 @@ int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) } void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) { - pPage->pAmHdr = pPage->pData; - pPage->pPageHdr = pPage->pAmHdr + szAmHdr; + pPage->pPageHdr = pPage->pData + szAmHdr; TDB_PAGE_NCELLS_SET(pPage, 0); TDB_PAGE_CCELLS_SET(pPage, pPage->pageSize - sizeof(SPageFtr)); TDB_PAGE_FCELL_SET(pPage, 0); @@ -97,8 +96,7 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell } void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) { - pPage->pAmHdr = pPage->pData; - pPage->pPageHdr = pPage->pAmHdr + szAmHdr; + pPage->pPageHdr = pPage->pData + szAmHdr; pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage); pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage); pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage); @@ -117,7 +115,7 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) { int lidx; // local idx SCell *pNewCell; - ASSERT(szCell <= TDB_PAGE_MAX_FREE_BLOCK(pPage, pPage->pPageHdr - pPage->pAmHdr)); + ASSERT(szCell <= TDB_PAGE_MAX_FREE_BLOCK(pPage, pPage->pPageHdr - pPage->pData)); nFree = TDB_PAGE_NFREE(pPage); nCells = TDB_PAGE_NCELLS(pPage); @@ -220,7 +218,7 @@ void tdbPageCopy(SPage *pFromPage, SPage *pToPage) { ASSERT(TDB_PAGE_CCELLS(pToPage) == pToPage->pFreeEnd - pToPage->pData); - delta = (pToPage->pPageHdr - pToPage->pAmHdr) - (pFromPage->pPageHdr - pFromPage->pAmHdr); + delta = (pToPage->pPageHdr - pToPage->pData) - (pFromPage->pPageHdr - pFromPage->pData); if (delta != 0) { nFree = TDB_PAGE_NFREE(pFromPage); TDB_PAGE_NFREE_SET(pToPage, nFree - delta);