From f9c09351f00592ff5d49619d96a1a518092ab29c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 9 Mar 2022 06:26:39 +0000 Subject: [PATCH] more TDB --- source/libs/tdb/src/db/tdbBtree.c | 95 +++++++++++++++++++++++++++++-- source/libs/tdb/src/inc/tdbPage.h | 25 ++++---- 2 files changed, 105 insertions(+), 15 deletions(-) diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index e74745ac13..569d5b263e 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -450,15 +450,102 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { return 0; } -static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, bool isRoot) { +static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { int nOldPages; SPage *pOldPages[3]; int nNewPages; SPage *pNewPages[5]; + void *pCell; + SPgno pgno; + int i; + int nDiv; + int ret; + SPage *pPage; + void *pCellDiv[2]; - // Find three or less sibling pages + { + // TODO: Find three or less sibling pages on either side + i = pParent->pPageHdr->nCells + pParent->nOverflow; + if (i < 1) { + nDiv = 0; + } else { + if (idx == 0) { + nDiv = 0; + } else if (idx == i) { + nDiv = i - 2; + } else { + nDiv = idx - 1; + } + i = 2; + } + nOldPages = i + 1; + + if (i + nDiv - pParent->nOverflow == pParent->pPageHdr->nCells) { + pgno = pParent->pPageHdr->rChild; + } else { + ASSERT(0); + // TODO + pgno = 0; + } + for (;;) { + ret = tdbPagerFetchPage(pBt->pPager, pgno, &pPage, tdbBtreeInitPage, pBt); + if (ret < 0) { + ASSERT(0); + return -1; + } + + pOldPages[i] = pPage; + + if ((i--) == 0) break; + + if (pParent->nOverflow && i + nDiv == pParent->aiOvfl[0]) { + pCellDiv[i] = pParent->apOvfl[0]; + // pgno = 0; + // szNew[i] = tdbPageCellSize(pPage, pCell); + pParent->nOverflow = 0; + } else { + // pCellDiv[i] = TDB_PAGE_CELL_AT(pPage, i + nDiv - pParent->nOverflow); + // pgno = 0; + // szNew[i] = tdbPageCellSize(pPage, pCell); + + // Drop the cell from the page + // ret = tdbPageDropCell(pPage, i + nDiv - pParent->nOverflow, szNew[i]); + // if (ret < 0) { + // return -1; + // } + } + /* code */ + } + } + + { + // TODO: Load all cells on the old page and the divider cells + } + + { + // TODO: Get the number of pages needed to hold all cells + } + + { + // TODO: Allocate enough new pages. Reuse old pages as much as possible + } + + { + // TODO: Insert new divider cells into pParent + } + + { + // TODO: Update the sibling pages + } + + { + // TODO: Reset states + } + + { + // TODO: Clear resources + } - /* TODO */ return 0; } @@ -502,7 +589,7 @@ static int tdbBtreeBalance(SBtCursor *pCur) { // Generalized balance step pParent = pCur->pgStack[pCur->iPage - 1]; - ret = tdbBtreeBalanceNonRoot(pCur->pBt, pParent, pCur->idxStack[pCur->iPage - 1], (iPage == 1)); + ret = tdbBtreeBalanceNonRoot(pCur->pBt, pParent, pCur->idxStack[pCur->iPage - 1]); if (ret < 0) { return -1; } diff --git a/source/libs/tdb/src/inc/tdbPage.h b/source/libs/tdb/src/inc/tdbPage.h index d8d609e4e0..2bb8866278 100644 --- a/source/libs/tdb/src/inc/tdbPage.h +++ b/source/libs/tdb/src/inc/tdbPage.h @@ -21,11 +21,12 @@ extern "C" { #endif typedef struct __attribute__((__packed__)) { - u16 flags; - u16 nCells; - u16 cellCont; - u16 freeCell; - u16 nFree; + u16 flags; + u16 nCells; + u16 cellCont; + u16 freeCell; + u16 nFree; + SPgno rChild; } SPageHdr; typedef struct SPage SPage; @@ -53,6 +54,8 @@ struct SPage { int maxLocal; int minLocal; int nOverflow; + void *apOvfl[4]; + int aiOvfl[4]; }; // For page lock @@ -80,13 +83,13 @@ struct SPage { // For page ref #define TDB_INIT_PAGE_REF(pPage) ((pPage)->nRef = 0) #if 0 -#define TDB_REF_PAGE(pPage) (++(pPage)->nRef) -#define TDB_UNREF_PAGE(pPage) (--(pPage)->nRef) -#define TDB_GET_PAGE_REF(pPage) ((pPage)->nRef) +#define TDB_REF_PAGE(pPage) (++(pPage)->nRef) +#define TDB_UNREF_PAGE(pPage) (--(pPage)->nRef) +#define TDB_GET_PAGE_REF(pPage) ((pPage)->nRef) #else -#define TDB_REF_PAGE(pPage) atomic_add_fetch_32(&((pPage)->nRef), 1) -#define TDB_UNREF_PAGE(pPage) atomic_sub_fetch_32(&((pPage)->nRef), 1) -#define TDB_GET_PAGE_REF(pPage) atomic_load_32(&((pPage)->nRef)) +#define TDB_REF_PAGE(pPage) atomic_add_fetch_32(&((pPage)->nRef), 1) +#define TDB_UNREF_PAGE(pPage) atomic_sub_fetch_32(&((pPage)->nRef), 1) +#define TDB_GET_PAGE_REF(pPage) atomic_load_32(&((pPage)->nRef)) #endif #ifdef __cplusplus