diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index b133e1fcbc..fc27e88cf1 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -18,12 +18,6 @@ #define TDB_BTREE_ROOT 0x1 #define TDB_BTREE_LEAF 0x2 -#define TDB_BTREE_PAGE_IS_ROOT(flags) TDB_FLAG_HAS(flags, TDB_BTREE_ROOT) -#define TDB_BTREE_PAGE_IS_LEAF(flags) TDB_FLAG_HAS(flags, TDB_BTREE_LEAF) -#define TDB_BTREE_ASSERT_FLAG(flags) \ - ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \ - TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0)) - struct SBTree { SPgno root; int keyLen; @@ -40,8 +34,13 @@ struct SBTree { #define TDB_BTREE_PAGE_COMMON_HDR u8 flags; -#define TDB_BTREE_PAGE_GET_FLAGS(PAGE) (PAGE)->pData[0] +#define TDB_BTREE_PAGE_GET_FLAGS(PAGE) (PAGE)->pData[0] #define TDB_BTREE_PAGE_SET_FLAGS(PAGE, flags) ((PAGE)->pData[0] = (flags)) +#define TDB_BTREE_PAGE_IS_ROOT(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT) +#define TDB_BTREE_PAGE_IS_LEAF(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF) +#define TDB_BTREE_ASSERT_FLAG(flags) \ + ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \ + TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0)) typedef struct __attribute__((__packed__)) { TDB_BTREE_PAGE_COMMON_HDR @@ -58,15 +57,15 @@ typedef struct { } SBtreeInitPageArg; typedef struct { - int kLen; - u8 *pKey; - int vLen; - u8 *pVal; - SPgno pgno; - u8 *pTmpSpace; + int kLen; + const u8 *pKey; + int vLen; + const u8 *pVal; + SPgno pgno; + u8 *pBuf; } SCellDecoder; -static int tdbBtCursorMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst); +static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst); static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2); static int tdbBtreeOpenImpl(SBTree *pBt); static int tdbBtreeZeroPage(SPage *pPage, void *arg); @@ -77,7 +76,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD static int tdbBtreeBalance(SBTC *pBtc); static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell); static int tdbBtcMoveToNext(SBTC *pBtc); -static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno); +static int tdbBtcMoveDownward(SBTC *pBtc); static int tdbBtcMoveUpward(SBTC *pBtc); int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, SBTree **ppBt) { @@ -94,9 +93,9 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S } // pBt->keyLen - pBt->keyLen = keyLen; + pBt->keyLen = keyLen < 0 ? TDB_VARIANT_LEN : keyLen; // pBt->valLen - pBt->valLen = valLen; + pBt->valLen = valLen < 0 ? TDB_VARIANT_LEN : valLen; // pBt->pPager pBt->pPager = pPager; // pBt->kcmpr @@ -137,7 +136,7 @@ int tdbBtCursorInsert(SBTC *pBtc, const void *pKey, int kLen, const void *pVal, int cret; SBTree *pBt; - ret = tdbBtCursorMoveTo(pBtc, pKey, kLen, &cret); + ret = tdbBtcMoveTo(pBtc, pKey, kLen, &cret); if (ret < 0) { // TODO: handle error return -1; @@ -200,7 +199,7 @@ int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen tdbBtcOpen(&btc, pBt); - tdbBtCursorMoveTo(&btc, pKey, kLen, &cret); + tdbBtcMoveTo(&btc, pKey, kLen, &cret); if (cret) { return cret; @@ -230,7 +229,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL tdbBtcOpen(&btc, pBt); - tdbBtCursorMoveTo(&btc, pKey, kLen, &cret); + tdbBtcMoveTo(&btc, pKey, kLen, &cret); if (cret) { return cret; } @@ -257,106 +256,6 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL return 0; } -static int tdbBtCursorMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { - int ret; - SBTree *pBt; - SPager *pPager; - - pBt = pBtc->pBt; - pPager = pBt->pPager; - - if (pBtc->iPage < 0) { - ASSERT(pBtc->iPage == -1); - ASSERT(pBtc->idx == -1); - - // Move from the root - ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt); - if (ret < 0) { - ASSERT(0); - return -1; - } - - pBtc->iPage = 0; - - if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) { - // Current page is empty - // ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pBtc->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF)); - return 0; - } - - for (;;) { - int lidx, ridx, midx, c, nCells; - SCell *pCell; - SPage *pPage; - SCellDecoder cd = {0}; - - pPage = pBtc->pPage; - nCells = TDB_PAGE_TOTAL_CELLS(pPage); - lidx = 0; - ridx = nCells - 1; - - ASSERT(nCells > 0); - - for (;;) { - if (lidx > ridx) break; - - midx = (lidx + ridx) >> 1; - - pCell = tdbPageGetCell(pPage, midx); - ret = tdbBtreeDecodeCell(pPage, pCell, &cd); - if (ret < 0) { - // TODO: handle error - ASSERT(0); - return -1; - } - - // Compare the key values - c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen); - if (c < 0) { - /* input-key < cell-key */ - ridx = midx - 1; - } else if (c > 0) { - /* input-key > cell-key */ - lidx = midx + 1; - } else { - /* input-key == cell-key */ - break; - } - } - - // Move downward or break - u8 flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); - u8 leaf = TDB_BTREE_PAGE_IS_LEAF(flags); - if (leaf) { - pBtc->idx = midx; - *pCRst = c; - break; - } else { - if (c <= 0) { - pBtc->idx = midx; - tdbBtcMoveDownward(pBtc, cd.pgno); - } else { - pBtc->idx = midx + 1; - if (midx == nCells - 1) { - /* Move to right-most child */ - tdbBtcMoveDownward(pBtc, ((SIntHdr *)pBtc->pPage->pData)->pgno); - } else { - pCell = tdbPageGetCell(pPage, pBtc->idx); - tdbBtreeDecodeCell(pPage, pCell, &cd); - tdbBtcMoveDownward(pBtc, cd.pgno); - } - } - } - } - - } else { - // TODO: Move the cursor from a some position instead of a clear state - ASSERT(0); - } - - return 0; -} - static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) { int mlen; int cret; @@ -416,7 +315,7 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg) { pBt = (SBTree *)arg; flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); - isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags); + isLeaf = TDB_BTREE_PAGE_IS_LEAF(pPage); ASSERT(flags == TDB_BTREE_PAGE_GET_FLAGS(pPage)); @@ -442,15 +341,15 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg) { static int tdbBtreeZeroPage(SPage *pPage, void *arg) { u8 flags; SBTree *pBt; - u8 isLeaf; + u8 leaf; flags = ((SBtreeInitPageArg *)arg)->flags; pBt = ((SBtreeInitPageArg *)arg)->pBt; - isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags); + leaf = flags & TDB_BTREE_LEAF; - tdbPageZero(pPage, isLeaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize); + tdbPageZero(pPage, leaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize); - if (isLeaf) { + if (leaf) { SLeafHdr *pLeafHdr = (SLeafHdr *)(pPage->pData); pLeafHdr->flags = flags; @@ -495,7 +394,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { pPager = pRoot->pPager; flags = TDB_BTREE_PAGE_GET_FLAGS(pRoot); - leaf = TDB_BTREE_PAGE_IS_LEAF(flags); + leaf = TDB_BTREE_PAGE_IS_LEAF(pRoot); // Allocate a new child page zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT); @@ -561,7 +460,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { SPgno pgno; if (sIdx + i == nCells) { - ASSERT(!TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pParent))); + ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pParent)); pgno = ((SIntHdr *)(pParent->pData))->pgno; } else { pCell = tdbPageGetCell(pParent, sIdx + i); @@ -575,7 +474,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { } } // copy the parent key out if child pages are not leaf page - childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pOlds[0])); + childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]); if (childNotLeaf) { for (int i = 0; i < nOlds; i++) { if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) { @@ -849,9 +748,8 @@ static int tdbBtreeBalance(SBTC *pBtc) { for (;;) { iPage = pBtc->iPage; pPage = pBtc->pPage; - flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); - leaf = TDB_BTREE_PAGE_IS_LEAF(flags); - root = TDB_BTREE_PAGE_IS_ROOT(flags); + leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); + root = TDB_BTREE_PAGE_IS_ROOT(pPage); // when the page is not overflow and not too empty, the balance work // is finished. Just break out the balance loop. @@ -892,23 +790,17 @@ static int tdbBtreeBalance(SBTC *pBtc) { } #endif -#ifndef TDB_BTREE_CELL // ========================================================= -static int tdbBtreeEncodePayload(SPage *pPage, u8 *pPayload, const void *pKey, int kLen, const void *pVal, int vLen, - int *szPayload) { +// TDB_BTREE_CELL ===================== +static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const void *pKey, int kLen, const void *pVal, + int vLen, int *szPayload) { int nPayload; - ASSERT(pKey != NULL); - - if (pVal == NULL) { - vLen = 0; - } - nPayload = kLen + vLen; - if (nPayload <= pPage->maxLocal) { - // General case without overflow - memcpy(pPayload, pKey, kLen); + if (nPayload + nHeader <= pPage->maxLocal) { + // no overflow page is needed + memcpy(pCell + nHeader, pKey, kLen); if (pVal) { - memcpy(pPayload + kLen, pVal, vLen); + memcpy(pCell + nHeader + kLen, pVal, vLen); } *szPayload = nPayload; @@ -923,10 +815,8 @@ static int tdbBtreeEncodePayload(SPage *pPage, u8 *pPayload, const void *pKey, i return 0; } -// TODO: allow vLen = 0 static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell, int *szCell) { - u8 flags; u8 leaf; int nHeader; int nPayload; @@ -934,11 +824,11 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo ASSERT(pPage->kLen == TDB_VARIANT_LEN || pPage->kLen == kLen); ASSERT(pPage->vLen == TDB_VARIANT_LEN || pPage->vLen == vLen); + ASSERT(pKey != NULL && kLen > 0); nPayload = 0; nHeader = 0; - flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); - leaf = TDB_BTREE_PAGE_IS_LEAF(flags); + leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); // 1. Encode Header part /* Encode SPgno if interior page */ @@ -960,38 +850,42 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo } // 2. Encode payload part - if (leaf && vLen > 0) { - ret = tdbBtreeEncodePayload(pPage, pCell + nHeader, pKey, kLen, pVal, vLen, &nPayload); - } else { - ret = tdbBtreeEncodePayload(pPage, pCell + nHeader, pKey, kLen, NULL, 0, &nPayload); + if ((!leaf) || pPage->vLen == 0) { + pVal = NULL; + vLen = 0; } + + ret = tdbBtreeEncodePayload(pPage, pCell, nHeader, pKey, kLen, pVal, vLen, &nPayload); if (ret < 0) { - // TODO: handle error - return -1; + // TODO + ASSERT(0); + return 0; } *szCell = nHeader + nPayload; return 0; } -static int tdbBtreeDecodePayload(SPage *pPage, const u8 *pPayload, SCellDecoder *pDecoder) { +static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder) { int nPayload; - ASSERT(pDecoder->pKey == NULL); - if (pDecoder->pVal) { - nPayload = pDecoder->kLen + pDecoder->vLen; - } else { + ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pPage)); nPayload = pDecoder->kLen; + } else { + nPayload = pDecoder->kLen + pDecoder->vLen; } - if (nPayload <= pPage->maxLocal) { - // General case without overflow - pDecoder->pKey = (void *)pPayload; - if (!pDecoder->pVal) { - pDecoder->pVal = (void *)(pPayload + pDecoder->kLen); + if (nHeader + nPayload <= pPage->maxLocal) { + // no over flow case + pDecoder->pKey = pCell + nHeader; + if (pDecoder->pVal == NULL && pDecoder->vLen > 0) { + pDecoder->pVal = pCell + nHeader + pDecoder->kLen; } - } else { + return 0; + } + + { // TODO: handle overflow case ASSERT(0); } @@ -999,16 +893,13 @@ static int tdbBtreeDecodePayload(SPage *pPage, const u8 *pPayload, SCellDecoder return 0; } -// TODO: here has problem static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder) { - u8 flags; u8 leaf; int nHeader; int ret; nHeader = 0; - flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); - leaf = TDB_BTREE_PAGE_IS_LEAF(flags); + leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); // Clear the state of decoder pDecoder->kLen = -1; @@ -1033,13 +924,14 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD } if (pPage->vLen == TDB_VARIANT_LEN) { + ASSERT(leaf); nHeader += tdbGetVarInt(pCell + nHeader, &(pDecoder->vLen)); } else { pDecoder->vLen = pPage->vLen; } // 2. Decode payload part - ret = tdbBtreeDecodePayload(pPage, pCell + nHeader, pDecoder); + ret = tdbBtreeDecodePayload(pPage, pCell, nHeader, pDecoder); if (ret < 0) { return -1; } @@ -1048,16 +940,14 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD } static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) { - u8 flags; - u8 isLeaf; + u8 leaf; int szCell; int kLen = 0, vLen = 0; - flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); - isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags); + leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); szCell = 0; - if (!isLeaf) { + if (!leaf) { szCell += sizeof(SPgno); } @@ -1067,21 +957,28 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) { kLen = pPage->kLen; } - if (isLeaf) { - if (pPage->vLen == TDB_VARIANT_LEN) { - szCell += tdbGetVarInt(pCell + szCell, &vLen); - } else { - vLen = pPage->vLen; - } + if (pPage->vLen == TDB_VARIANT_LEN) { + ASSERT(leaf); + szCell += tdbGetVarInt(pCell + szCell, &vLen); + } else if (leaf) { + vLen = pPage->vLen; } szCell = szCell + kLen + vLen; - return szCell; + if (szCell <= pPage->maxLocal) { + return szCell; + } + + { + // TODO + ASSERT(0); + return 0; + } } +// TDB_BTREE_CELL -#endif - +// TDB_BTREE_CURSOR ===================== int tdbBtcOpen(SBTC *pBtc, SBTree *pBt) { pBtc->pBt = pBt; pBtc->iPage = -1; @@ -1095,7 +992,6 @@ int tdbBtcMoveToFirst(SBTC *pBtc) { int ret; SBTree *pBt; SPager *pPager; - u8 flags; SCell *pCell; SPgno pgno; @@ -1110,23 +1006,43 @@ int tdbBtcMoveToFirst(SBTC *pBtc) { return -1; } + ASSERT(TDB_BTREE_PAGE_IS_ROOT(pBtc->pPage)); + pBtc->iPage = 0; - pBtc->idx = 0; + if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) > 0) { + pBtc->idx = 0; + } else { + // no any data, point to an invalid position + ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)); + pBtc->idx = -1; + return 0; + } } else { // move from a position - ASSERT(0); + int iPage = 0; + + for (; iPage < pBtc->iPage; iPage++) { + ASSERT(pBtc->idxStack[iPage] >= 0); + if (pBtc->idxStack[iPage]) break; + } + + // move upward + for (;;) { + if (pBtc->iPage == 0) { + pBtc->idx = 0; + break; + } + + if (pBtc->iPage < iPage) break; + tdbBtcMoveUpward(pBtc); + } } // move downward for (;;) { - flags = TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage); + if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break; - if (TDB_BTREE_PAGE_IS_LEAF(flags)) break; - - pCell = tdbPageGetCell(pBtc->pPage, 0); - pgno = *(SPgno *)pCell; - - ret = tdbBtcMoveDownward(pBtc, pgno); + ret = tdbBtcMoveDownward(pBtc); if (ret < 0) { ASSERT(0); return -1; @@ -1142,7 +1058,6 @@ int tdbBtcMoveToLast(SBTC *pBtc) { int ret; SBTree *pBt; SPager *pPager; - u8 flags; SPgno pgno; pBt = pBtc->pBt; @@ -1164,18 +1079,15 @@ int tdbBtcMoveToLast(SBTC *pBtc) { // move downward for (;;) { - flags = TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage); - - if (TDB_BTREE_PAGE_IS_LEAF(flags)) { + if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) { // TODO: handle empty case ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) > 0); pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1; break; } else { pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage); - pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno; - ret = tdbBtcMoveDownward(pBtc, pgno); + ret = tdbBtcMoveDownward(pBtc); if (ret < 0) { ASSERT(0); return -1; @@ -1186,11 +1098,6 @@ int tdbBtcMoveToLast(SBTC *pBtc) { return 0; } -int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen) { - // TODO - return 0; -} - int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { SCell *pCell; SCellDecoder cd; @@ -1233,11 +1140,9 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) { static int tdbBtcMoveToNext(SBTC *pBtc) { int nCells; - SPgno pgno; SCell *pCell; - u8 flags; - ASSERT(TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage))); + ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)); if (pBtc->idx < 0) return -1; @@ -1270,18 +1175,11 @@ static int tdbBtcMoveToNext(SBTC *pBtc) { // Move downward for (;;) { nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage); - if (pBtc->idx < nCells) { - pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx); - pgno = *(SPgno *)pCell; - } else { - pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno; - } - tdbBtcMoveDownward(pBtc, pgno); + tdbBtcMoveDownward(pBtc); pBtc->idx = 0; - flags = TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage); - if (TDB_BTREE_PAGE_IS_LEAF(flags)) { + if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) { break; } } @@ -1289,13 +1187,20 @@ static int tdbBtcMoveToNext(SBTC *pBtc) { return 0; } -int tdbBtcClose(SBTC *pBtc) { - // TODO - return 0; -} +static int tdbBtcMoveDownward(SBTC *pBtc) { + int ret; + SPgno pgno; + SCell *pCell; -static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno) { - int ret; + ASSERT(pBtc->idx >= 0); + ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)); + + if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) { + pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx); + pgno = ((SPgno *)pCell)[0]; + } else { + pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno; + } pBtc->pgStack[pBtc->iPage] = pBtc->pPage; pBtc->idxStack[pBtc->iPage] = pBtc->idx; @@ -1306,6 +1211,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno) { ret = tdbPagerFetchPage(pBtc->pBt->pPager, pgno, &pBtc->pPage, tdbBtreeInitPage, pBtc->pBt); if (ret < 0) { ASSERT(0); + return -1; } return 0; @@ -1314,7 +1220,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno) { static int tdbBtcMoveUpward(SBTC *pBtc) { if (pBtc->iPage == 0) return -1; - // tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage); + tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage); pBtc->iPage--; pBtc->pPage = pBtc->pgStack[pBtc->iPage]; @@ -1323,6 +1229,117 @@ static int tdbBtcMoveUpward(SBTC *pBtc) { return 0; } +static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { + int ret; + SBTree *pBt; + SPager *pPager; + + pBt = pBtc->pBt; + pPager = pBt->pPager; + + if (pBtc->iPage < 0) { + ASSERT(pBtc->iPage == -1); + ASSERT(pBtc->idx == -1); + + // Move from the root + ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt); + if (ret < 0) { + ASSERT(0); + return -1; + } + + pBtc->iPage = 0; + + if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) { + // Current page is empty + // ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pBtc->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF)); + return 0; + } + + for (;;) { + int lidx, ridx, midx, c, nCells; + SCell *pCell; + SPage *pPage; + SCellDecoder cd = {0}; + + pPage = pBtc->pPage; + nCells = TDB_PAGE_TOTAL_CELLS(pPage); + lidx = 0; + ridx = nCells - 1; + + ASSERT(nCells > 0); + + for (;;) { + if (lidx > ridx) break; + + midx = (lidx + ridx) >> 1; + + pCell = tdbPageGetCell(pPage, midx); + ret = tdbBtreeDecodeCell(pPage, pCell, &cd); + if (ret < 0) { + // TODO: handle error + ASSERT(0); + return -1; + } + + // Compare the key values + c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen); + if (c < 0) { + /* input-key < cell-key */ + ridx = midx - 1; + } else if (c > 0) { + /* input-key > cell-key */ + lidx = midx + 1; + } else { + /* input-key == cell-key */ + break; + } + } + + // Move downward or break + u8 leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); + if (leaf) { + pBtc->idx = midx; + *pCRst = c; + break; + } else { + if (c <= 0) { + pBtc->idx = midx; + } else { + pBtc->idx = midx + 1; + } + tdbBtcMoveDownward(pBtc); + } + } + + } else { + // TODO: Move the cursor from a some position instead of a clear state + ASSERT(0); + } + + return 0; +} + +int tdbBtcClose(SBTC *pBtc) { + if (pBtc->iPage < 0) return 0; + + for (;;) { + ASSERT(pBtc->pPage); + + tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage); + + pBtc->iPage--; + if (pBtc->iPage < 0) break; + + pBtc->pPage = pBtc->pgStack[pBtc->iPage]; + pBtc->idx = pBtc->idxStack[pBtc->iPage]; + } + + return 0; +} +// TDB_BTREE_CURSOR + +// TDB_BTREE_DEBUG ===================== #ifndef NODEBUG typedef struct { SPgno pgno; @@ -1336,17 +1353,14 @@ typedef struct { SBtPageInfo btPageInfos[20]; void tdbBtPageInfo(SPage *pPage, int idx) { - u8 flags; SBtPageInfo *pBtPageInfo; pBtPageInfo = btPageInfos + idx; pBtPageInfo->pgno = TDB_PAGE_PGNO(pPage); - flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); - - pBtPageInfo->root = TDB_BTREE_PAGE_IS_ROOT(flags); - pBtPageInfo->leaf = TDB_BTREE_PAGE_IS_LEAF(flags); + pBtPageInfo->root = TDB_BTREE_PAGE_IS_ROOT(pPage); + pBtPageInfo->leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); pBtPageInfo->rChild = 0; if (!pBtPageInfo->leaf) { @@ -1356,4 +1370,5 @@ void tdbBtPageInfo(SPage *pPage, int idx) { pBtPageInfo->nCells = TDB_PAGE_TOTAL_CELLS(pPage) - pPage->nOverflow; pBtPageInfo->nOvfl = pPage->nOverflow; } -#endif \ No newline at end of file +#endif +// TDB_BTREE_DEBUG \ No newline at end of file diff --git a/source/libs/tdb/src/inc/tdbBtree.h b/source/libs/tdb/src/inc/tdbBtree.h index 0e8ca0c803..2797b5ea5e 100644 --- a/source/libs/tdb/src/inc/tdbBtree.h +++ b/source/libs/tdb/src/inc/tdbBtree.h @@ -35,7 +35,6 @@ struct SBTC { int idx; int idxStack[BTREE_MAX_DEPTH + 1]; SPage *pgStack[BTREE_MAX_DEPTH + 1]; - void *pBuf; }; // SBTree @@ -49,7 +48,6 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL int tdbBtcOpen(SBTC *pCur, SBTree *pBt); int tdbBtcMoveToFirst(SBTC *pBtc); int tdbBtcMoveToLast(SBTC *pBtc); -int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen); int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen); int tdbBtcClose(SBTC *pBtc); diff --git a/source/libs/tdb/src/inc/tdbOs.h b/source/libs/tdb/src/inc/tdbOs.h index 794d4c502a..196bbdafc7 100644 --- a/source/libs/tdb/src/inc/tdbOs.h +++ b/source/libs/tdb/src/inc/tdbOs.h @@ -64,7 +64,7 @@ typedef TdThreadSpinlock tdb_spinlock_t; #define tdbSpinlockDestroy taosThreadSpinDestroy #define tdbSpinlockLock taosThreadSpinLock #define tdbSpinlockUnlock taosThreadSpinUnlock -#define tdbSpinlockTrylock pthread_spin_trylock +#define tdbSpinlockTrylock taosThreadSpinTrylock /* mutex lock */ typedef TdThreadMutex tdb_mutex_t;