diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 67b11f8ef4..a1f879c729 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -187,7 +187,7 @@ int tdbBtCursorInsert(SBtCursor *pCur, const void *pKey, int kLen, const void *p } // Insert the cell to the index - ret = tdbPageInsertCell(pCur->pPage, idx, pCell, szCell); + ret = tdbPageInsertCell(pCur->pPage, idx, pCell, szCell, 0); if (ret < 0) { return -1; } @@ -490,7 +490,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { SCell *pDivCell[2] = {0}; int szDivCell[2]; int sIdx; - u8 childLeaf; + u8 childNotLeaf; { // Find 3 child pages at most to do balance int nCells = TDB_PAGE_TOTAL_CELLS(pParent); @@ -529,14 +529,21 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { } } // copy the parent key out if child pages are not leaf page - childLeaf = TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pOlds[0])); - if (!childLeaf) { + childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pOlds[0])); + if (childNotLeaf) { for (int i = 0; i < nOlds - 1; i++) { pCell = tdbPageGetCell(pParent, sIdx + i); szDivCell[i] = tdbBtreeCellSize(pParent, pCell); pDivCell[i] = malloc(szDivCell[i]); memcpy(pDivCell, pCell, szDivCell[i]); + + ((SPgno *)pDivCell)[0] = ((SIntHdr *)pOlds[i]->pData)->pgno; + ((SIntHdr *)pOlds[i]->pData)->pgno = 0; + + // here we insert the cell as an overflow cell to avoid + // the slow defragment process + tdbPageInsertCell(pOlds[i], TDB_PAGE_TOTAL_CELLS(pOlds[i]), pDivCell[i], szDivCell[i], 1); } } // drop the cells on parent page @@ -575,7 +582,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { // page is full, use a new page nNews++; // for an internal leaf case, this cell is used as the new divider cell to parent - if (!childLeaf) continue; + if (childNotLeaf) continue; } infoNews[nNews].cnt++; infoNews[nNews].size += cellBytes; @@ -584,7 +591,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { } // For internal pages - if (!childLeaf && oPage < nOlds - 1) { + if (childNotLeaf && oPage < nOlds - 1) { if (infoNews[nNews].size + szDivCell[oPage] + TDB_PAGE_OFFSET_SIZE(pPage) > TDB_PAGE_USABLE_SIZE(pPage)) { nNews++; } @@ -607,7 +614,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { pPage = pOlds[infoNews[iNew - 1].oPage]; nCells = TDB_PAGE_TOTAL_CELLS(pPage); - if (childLeaf) { // child leaf + if (!childNotLeaf) { // child leaf for (;;) { pCell = tdbPageGetCell(pPage, infoNews[iNew - 1].oIdx); cellBytes = TDB_BYTES_CELL_TAKEN(pPage, pCell); @@ -729,7 +736,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { nCells = TDB_PAGE_TOTAL_CELLS(pOlds[iOld]); tdbBtreeZeroPage(pTPage[tPage], &iarg); tdbPageCopy(pOlds[iOld], pTPage[tPage]); - if (!childLeaf) { + if (childNotLeaf) { ((SIntHdr *)pTPage[tPage]->pData)->pgno = ((SIntHdr *)pOlds[iOld]->pData)->pgno; } @@ -746,7 +753,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { tIdx++; break; } else { - if (!childLeaf) { + if (childNotLeaf) { if (iOld < nOlds - 1) { pCell = pDivCell[iOld]; szCell = szDivCell[iOld]; @@ -780,18 +787,18 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { } } - tdbPageInsertCell(pNews[iNew], iCell, pCell, szCell); + tdbPageInsertCell(pNews[iNew], iCell, pCell, szCell, 0); } // fill right-most child pgno if internal page - if (!childLeaf) { + if (childNotLeaf) { if (tIdx < nCells) { pCell = tdbPageGetCell(pTPage[tPage], tIdx); szCell = tdbBtreeCellSize(pTPage[tPage], pCell); tIdx++; break; } else { - if (!childLeaf) { + if (!childNotLeaf) { if (iOld < nOlds - 1) { pCell = pDivCell[iOld]; szCell = szDivCell[iOld]; @@ -836,7 +843,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&TDB_PAGE_PGNO(pNews[iNew]), sizeof(SPgno), pCell, &szCell); // TODO: the cell here may be used by pParent as an overflow cell - tdbPageInsertCell(pParent, sIdx++, pCell, szCell); + tdbPageInsertCell(pParent, sIdx++, pCell, szCell, 0); } } diff --git a/source/libs/tdb/src/inc/tdbPage.h b/source/libs/tdb/src/inc/tdbPage.h index 0ba7cf236b..a6f9fbf615 100644 --- a/source/libs/tdb/src/inc/tdbPage.h +++ b/source/libs/tdb/src/inc/tdbPage.h @@ -108,7 +108,7 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg); void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)); void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)); -int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell); +int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl); int tdbPageDropCell(SPage *pPage, int idx); void tdbPageCopy(SPage *pFromPage, SPage *pToPage); diff --git a/source/libs/tdb/src/page/tdbPage.c b/source/libs/tdb/src/page/tdbPage.c index 14960de45d..ee1a021a1b 100644 --- a/source/libs/tdb/src/page/tdbPage.c +++ b/source/libs/tdb/src/page/tdbPage.c @@ -114,7 +114,7 @@ void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell ASSERT(pPage->pFreeEnd - pPage->pFreeStart <= TDB_PAGE_NFREE(pPage)); } -int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) { +int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl) { int nFree; int nCells; int iOvfl; @@ -135,7 +135,19 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) { lidx = idx - iOvfl; - if (nFree >= szCell + TDB_PAGE_OFFSET_SIZE(pPage)) { + if (asOvfl || nFree < szCell + TDB_PAGE_OFFSET_SIZE(pPage)) { + // TODO: make it extensible + // add the cell as an overflow cell + for (int i = pPage->nOverflow; i > iOvfl; i--) { + pPage->apOvfl[i] = pPage->apOvfl[i - 1]; + pPage->aiOvfl[i] = pPage->aiOvfl[i - 1]; + } + + pPage->apOvfl[iOvfl] = pCell; + pPage->aiOvfl[iOvfl] = idx; + pPage->nOverflow++; + iOvfl++; + } else { // page must has enough space to hold the cell locally tdbPageAllocate(pPage, szCell, &pNewCell); @@ -149,18 +161,6 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell) { TDB_PAGE_NCELLS_SET(pPage, nCells + 1); ASSERT(pPage->pFreeStart == pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * (nCells + 1)); - } else { - // TODO: make it extensible - // add the cell as an overflow cell - for (int i = pPage->nOverflow; i > iOvfl; i--) { - pPage->apOvfl[i] = pPage->apOvfl[i - 1]; - pPage->aiOvfl[i] = pPage->aiOvfl[i - 1]; - } - - pPage->apOvfl[iOvfl] = pCell; - pPage->aiOvfl[iOvfl] = idx; - pPage->nOverflow++; - iOvfl++; } for (; iOvfl < pPage->nOverflow; iOvfl++) {