diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 0966f0c30b..96d4ed47ef 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -415,6 +415,7 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg) { // TODO: need to update the SPage.nFree pPage->nFree = pPage->pFreeEnd - pPage->pFreeStart; + pPage->nOverflow = 0; return 0; } @@ -445,7 +446,7 @@ typedef struct { SBTree *pBt; SPage *pParent; int idx; - i8 nOldPages; + i8 nOld; SPage *pOldPages[3]; i8 nNewPages; SPage *pNewPages[5]; @@ -514,68 +515,71 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { } static int tdbBtreeBalanceStep1(SBtreeBalanceHelper *pBlh) { -#if 0 - int i; - SPage *pParent; - int nDiv; - SPgno pgno; - SPage *pPage; - int ret; + int nCells; + int i; + int idxStart; + int nChild; + int ret; + SPage *pParent; + SPgno pgno; + SCell *pCell; + SCellDecoder cd; + SBTree *pBt; pParent = pBlh->pParent; - i = pParent->pPageHdr->nCells + pParent->nOverflow; + nCells = TDB_PAGE_NCELLS(pParent); + nChild = nCells + 1; + pBt = pBlh->pBt; - if (i < 1) { - nDiv = 0; + // TODO: ASSERT below needs to be removed + ASSERT(pParent->nOverflow == 0); + ASSERT(pBlh->idx <= nCells); + + if (nChild < 3) { + idxStart = 0; + pBlh->nOld = nChild; } else { if (pBlh->idx == 0) { - nDiv = 0; - } else if (pBlh->idx == i) { - nDiv = i - 2; + idxStart = 0; + } else if (pBlh->idx == nCells) { + idxStart = pBlh->idx - 2; } else { - nDiv = pBlh->idx - 1; + idxStart = pBlh->idx - 1; } - i = 2; + pBlh->nOld = 3; } - pBlh->nOldPages = i + 1; - if (i + nDiv - pParent->nOverflow == pParent->pPageHdr->nCells) { - // pgno = pParent->pPageHdr->rChild; + i = pBlh->nOld - 1; + + if (idxStart + i == nCells) { + pgno = ((SBtPageHdr *)(pParent->pAmHdr))[0].rChild; } else { - ASSERT(0); - // TODO - pgno = 0; + pCell = TDB_PAGE_CELL_AT(pParent, idxStart + i); + // TODO: no need to decode the payload part, and even the kLen, vLen part + // we only need the pgno part + ret = tdbBtreeDecodeCell(pParent, pCell, &cd); + if (ret < 0) { + ASSERT(0); + return -1; + } + pgno = cd.pgno; } for (;;) { - ret = tdbPagerFetchPage(pBlh->pBt->pPager, pgno, &pPage, tdbBtreeInitPage, pBlh->pBt); + ret = tdbPagerFetchPage(pBt->pPager, pgno, &(pBlh->pOldPages[i]), tdbBtreeInitPage, pBt); if (ret < 0) { ASSERT(0); return -1; } - pBlh->pOldPages[i] = pPage; - + // Loop over if ((i--) == 0) break; - if (pParent->nOverflow && i + nDiv == pParent->aiOvfl[0]) { - // pCellDiv[i] = pParent->apOvfl[0]; - // pgno = 0; - // szNew[i] = tdbPageCellSize(pPage, pCell); - pParent->nOverflow = 0; - } else { - // pCellDiv[i] = TDB_PAGE_CELL_AT(pPage, i + nDiv - pParent->nOverflow); - // pgno = 0; - // szNew[i] = tdbPageCellSize(pPage, pCell); - - // Drop the cell from the page - // ret = tdbPageDropCell(pPage, i + nDiv - pParent->nOverflow, szNew[i]); - // if (ret < 0) { - // return -1; - // } + { + // TODO + // ASSERT(0); } } -#endif return 0; } @@ -587,7 +591,7 @@ static int tdbBtreeBalanceStep2(SBtreeBalanceHelper *pBlh) { int limit; SCell *pCell; - for (int i = 0; i < pBlh->nOldPages; i++) { + for (int i = 0; i < pBlh->nOld; i++) { pPage = pBlh->pOldPages[i]; oidx = 0; cidx = 0; @@ -631,7 +635,7 @@ static int tdbBtreeBalanceStep2(SBtreeBalanceHelper *pBlh) { } static int tdbBtreeBalanceStep3(SBtreeBalanceHelper *pBlh) { - for (int i = 0; i < pBlh->nOldPages; i++) { + for (int i = 0; i < pBlh->nOld; i++) { /* code */ } @@ -657,6 +661,8 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { int ret; SBtreeBalanceHelper blh; + ASSERT(!TDB_BTREE_PAGE_IS_LEAF(TDB_PAGE_FLAGS(pParent))); + blh.pBt = pBt; blh.pParent = pParent; blh.idx = idx; @@ -738,7 +744,8 @@ static int tdbBtreeBalance(SBtCursor *pCur) { // } // } - // If balance over, break the loop + // when the page is not overflow and not too empty, the balance work + // is finished. Just break out the balance loop. if (pPage->nOverflow == 0 /* TODO: && pPage->nFree <= */) { break; } @@ -760,7 +767,7 @@ static int tdbBtreeBalance(SBtCursor *pCur) { pCur->pPage = pCur->pgStack[1]; } else { // Generalized balance step - pParent = pCur->pgStack[pCur->iPage - 1]; + pParent = pCur->pgStack[iPage - 1]; ret = tdbBtreeBalanceNonRoot(pCur->pBt, pParent, pCur->idxStack[pCur->iPage - 1]); if (ret < 0) { diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index c68535bdda..7bdcc38173 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -19,7 +19,7 @@ TEST(tdb_test, simple_test) { char key[64]; char val[64]; - for (int i = 1; i <= 65; i++) { + for (int i = 1; i <= 1000; i++) { sprintf(key, "key%d", i); sprintf(val, "value%d", i); ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val));