Merge pull request #11130 from taosdata/feature/tdb

Feature/tdb
This commit is contained in:
Hongze Cheng 2022-03-30 16:44:54 +08:00 committed by GitHub
commit 7f127284d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 246 additions and 233 deletions

View File

@ -18,12 +18,6 @@
#define TDB_BTREE_ROOT 0x1
#define TDB_BTREE_LEAF 0x2
#define TDB_BTREE_PAGE_IS_ROOT(flags) TDB_FLAG_HAS(flags, TDB_BTREE_ROOT)
#define TDB_BTREE_PAGE_IS_LEAF(flags) TDB_FLAG_HAS(flags, TDB_BTREE_LEAF)
#define TDB_BTREE_ASSERT_FLAG(flags) \
ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \
TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0))
struct SBTree {
SPgno root;
int keyLen;
@ -40,8 +34,13 @@ struct SBTree {
#define TDB_BTREE_PAGE_COMMON_HDR u8 flags;
#define TDB_BTREE_PAGE_GET_FLAGS(PAGE) (PAGE)->pData[0]
#define TDB_BTREE_PAGE_GET_FLAGS(PAGE) (PAGE)->pData[0]
#define TDB_BTREE_PAGE_SET_FLAGS(PAGE, flags) ((PAGE)->pData[0] = (flags))
#define TDB_BTREE_PAGE_IS_ROOT(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT)
#define TDB_BTREE_PAGE_IS_LEAF(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF)
#define TDB_BTREE_ASSERT_FLAG(flags) \
ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \
TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0))
typedef struct __attribute__((__packed__)) {
TDB_BTREE_PAGE_COMMON_HDR
@ -58,15 +57,15 @@ typedef struct {
} SBtreeInitPageArg;
typedef struct {
int kLen;
u8 *pKey;
int vLen;
u8 *pVal;
SPgno pgno;
u8 *pTmpSpace;
int kLen;
const u8 *pKey;
int vLen;
const u8 *pVal;
SPgno pgno;
u8 *pBuf;
} SCellDecoder;
static int tdbBtCursorMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst);
static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst);
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2);
static int tdbBtreeOpenImpl(SBTree *pBt);
static int tdbBtreeZeroPage(SPage *pPage, void *arg);
@ -77,7 +76,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
static int tdbBtreeBalance(SBTC *pBtc);
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell);
static int tdbBtcMoveToNext(SBTC *pBtc);
static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno);
static int tdbBtcMoveDownward(SBTC *pBtc);
static int tdbBtcMoveUpward(SBTC *pBtc);
int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, SBTree **ppBt) {
@ -94,9 +93,9 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S
}
// pBt->keyLen
pBt->keyLen = keyLen;
pBt->keyLen = keyLen < 0 ? TDB_VARIANT_LEN : keyLen;
// pBt->valLen
pBt->valLen = valLen;
pBt->valLen = valLen < 0 ? TDB_VARIANT_LEN : valLen;
// pBt->pPager
pBt->pPager = pPager;
// pBt->kcmpr
@ -137,7 +136,7 @@ int tdbBtCursorInsert(SBTC *pBtc, const void *pKey, int kLen, const void *pVal,
int cret;
SBTree *pBt;
ret = tdbBtCursorMoveTo(pBtc, pKey, kLen, &cret);
ret = tdbBtcMoveTo(pBtc, pKey, kLen, &cret);
if (ret < 0) {
// TODO: handle error
return -1;
@ -200,7 +199,7 @@ int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen
tdbBtcOpen(&btc, pBt);
tdbBtCursorMoveTo(&btc, pKey, kLen, &cret);
tdbBtcMoveTo(&btc, pKey, kLen, &cret);
if (cret) {
return cret;
@ -230,7 +229,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
tdbBtcOpen(&btc, pBt);
tdbBtCursorMoveTo(&btc, pKey, kLen, &cret);
tdbBtcMoveTo(&btc, pKey, kLen, &cret);
if (cret) {
return cret;
}
@ -257,106 +256,6 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
return 0;
}
static int tdbBtCursorMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
int ret;
SBTree *pBt;
SPager *pPager;
pBt = pBtc->pBt;
pPager = pBt->pPager;
if (pBtc->iPage < 0) {
ASSERT(pBtc->iPage == -1);
ASSERT(pBtc->idx == -1);
// Move from the root
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt);
if (ret < 0) {
ASSERT(0);
return -1;
}
pBtc->iPage = 0;
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) {
// Current page is empty
// ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pBtc->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF));
return 0;
}
for (;;) {
int lidx, ridx, midx, c, nCells;
SCell *pCell;
SPage *pPage;
SCellDecoder cd = {0};
pPage = pBtc->pPage;
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
lidx = 0;
ridx = nCells - 1;
ASSERT(nCells > 0);
for (;;) {
if (lidx > ridx) break;
midx = (lidx + ridx) >> 1;
pCell = tdbPageGetCell(pPage, midx);
ret = tdbBtreeDecodeCell(pPage, pCell, &cd);
if (ret < 0) {
// TODO: handle error
ASSERT(0);
return -1;
}
// Compare the key values
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c < 0) {
/* input-key < cell-key */
ridx = midx - 1;
} else if (c > 0) {
/* input-key > cell-key */
lidx = midx + 1;
} else {
/* input-key == cell-key */
break;
}
}
// Move downward or break
u8 flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
u8 leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
if (leaf) {
pBtc->idx = midx;
*pCRst = c;
break;
} else {
if (c <= 0) {
pBtc->idx = midx;
tdbBtcMoveDownward(pBtc, cd.pgno);
} else {
pBtc->idx = midx + 1;
if (midx == nCells - 1) {
/* Move to right-most child */
tdbBtcMoveDownward(pBtc, ((SIntHdr *)pBtc->pPage->pData)->pgno);
} else {
pCell = tdbPageGetCell(pPage, pBtc->idx);
tdbBtreeDecodeCell(pPage, pCell, &cd);
tdbBtcMoveDownward(pBtc, cd.pgno);
}
}
}
}
} else {
// TODO: Move the cursor from a some position instead of a clear state
ASSERT(0);
}
return 0;
}
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
int mlen;
int cret;
@ -416,7 +315,7 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg) {
pBt = (SBTree *)arg;
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags);
isLeaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
ASSERT(flags == TDB_BTREE_PAGE_GET_FLAGS(pPage));
@ -442,15 +341,15 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg) {
static int tdbBtreeZeroPage(SPage *pPage, void *arg) {
u8 flags;
SBTree *pBt;
u8 isLeaf;
u8 leaf;
flags = ((SBtreeInitPageArg *)arg)->flags;
pBt = ((SBtreeInitPageArg *)arg)->pBt;
isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags);
leaf = flags & TDB_BTREE_LEAF;
tdbPageZero(pPage, isLeaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize);
tdbPageZero(pPage, leaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize);
if (isLeaf) {
if (leaf) {
SLeafHdr *pLeafHdr = (SLeafHdr *)(pPage->pData);
pLeafHdr->flags = flags;
@ -495,7 +394,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
pPager = pRoot->pPager;
flags = TDB_BTREE_PAGE_GET_FLAGS(pRoot);
leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
leaf = TDB_BTREE_PAGE_IS_LEAF(pRoot);
// Allocate a new child page
zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT);
@ -561,7 +460,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
SPgno pgno;
if (sIdx + i == nCells) {
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pParent)));
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pParent));
pgno = ((SIntHdr *)(pParent->pData))->pgno;
} else {
pCell = tdbPageGetCell(pParent, sIdx + i);
@ -575,7 +474,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
}
}
// copy the parent key out if child pages are not leaf page
childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]));
childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]);
if (childNotLeaf) {
for (int i = 0; i < nOlds; i++) {
if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) {
@ -849,9 +748,8 @@ static int tdbBtreeBalance(SBTC *pBtc) {
for (;;) {
iPage = pBtc->iPage;
pPage = pBtc->pPage;
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
root = TDB_BTREE_PAGE_IS_ROOT(flags);
leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
root = TDB_BTREE_PAGE_IS_ROOT(pPage);
// when the page is not overflow and not too empty, the balance work
// is finished. Just break out the balance loop.
@ -892,23 +790,17 @@ static int tdbBtreeBalance(SBTC *pBtc) {
}
#endif
#ifndef TDB_BTREE_CELL // =========================================================
static int tdbBtreeEncodePayload(SPage *pPage, u8 *pPayload, const void *pKey, int kLen, const void *pVal, int vLen,
int *szPayload) {
// TDB_BTREE_CELL =====================
static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const void *pKey, int kLen, const void *pVal,
int vLen, int *szPayload) {
int nPayload;
ASSERT(pKey != NULL);
if (pVal == NULL) {
vLen = 0;
}
nPayload = kLen + vLen;
if (nPayload <= pPage->maxLocal) {
// General case without overflow
memcpy(pPayload, pKey, kLen);
if (nPayload + nHeader <= pPage->maxLocal) {
// no overflow page is needed
memcpy(pCell + nHeader, pKey, kLen);
if (pVal) {
memcpy(pPayload + kLen, pVal, vLen);
memcpy(pCell + nHeader + kLen, pVal, vLen);
}
*szPayload = nPayload;
@ -923,10 +815,8 @@ static int tdbBtreeEncodePayload(SPage *pPage, u8 *pPayload, const void *pKey, i
return 0;
}
// TODO: allow vLen = 0
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
int *szCell) {
u8 flags;
u8 leaf;
int nHeader;
int nPayload;
@ -934,11 +824,11 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
ASSERT(pPage->kLen == TDB_VARIANT_LEN || pPage->kLen == kLen);
ASSERT(pPage->vLen == TDB_VARIANT_LEN || pPage->vLen == vLen);
ASSERT(pKey != NULL && kLen > 0);
nPayload = 0;
nHeader = 0;
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
// 1. Encode Header part
/* Encode SPgno if interior page */
@ -960,38 +850,42 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
}
// 2. Encode payload part
if (leaf && vLen > 0) {
ret = tdbBtreeEncodePayload(pPage, pCell + nHeader, pKey, kLen, pVal, vLen, &nPayload);
} else {
ret = tdbBtreeEncodePayload(pPage, pCell + nHeader, pKey, kLen, NULL, 0, &nPayload);
if ((!leaf) || pPage->vLen == 0) {
pVal = NULL;
vLen = 0;
}
ret = tdbBtreeEncodePayload(pPage, pCell, nHeader, pKey, kLen, pVal, vLen, &nPayload);
if (ret < 0) {
// TODO: handle error
return -1;
// TODO
ASSERT(0);
return 0;
}
*szCell = nHeader + nPayload;
return 0;
}
static int tdbBtreeDecodePayload(SPage *pPage, const u8 *pPayload, SCellDecoder *pDecoder) {
static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder) {
int nPayload;
ASSERT(pDecoder->pKey == NULL);
if (pDecoder->pVal) {
nPayload = pDecoder->kLen + pDecoder->vLen;
} else {
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pPage));
nPayload = pDecoder->kLen;
} else {
nPayload = pDecoder->kLen + pDecoder->vLen;
}
if (nPayload <= pPage->maxLocal) {
// General case without overflow
pDecoder->pKey = (void *)pPayload;
if (!pDecoder->pVal) {
pDecoder->pVal = (void *)(pPayload + pDecoder->kLen);
if (nHeader + nPayload <= pPage->maxLocal) {
// no over flow case
pDecoder->pKey = pCell + nHeader;
if (pDecoder->pVal == NULL && pDecoder->vLen > 0) {
pDecoder->pVal = pCell + nHeader + pDecoder->kLen;
}
} else {
return 0;
}
{
// TODO: handle overflow case
ASSERT(0);
}
@ -999,16 +893,13 @@ static int tdbBtreeDecodePayload(SPage *pPage, const u8 *pPayload, SCellDecoder
return 0;
}
// TODO: here has problem
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder) {
u8 flags;
u8 leaf;
int nHeader;
int ret;
nHeader = 0;
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
// Clear the state of decoder
pDecoder->kLen = -1;
@ -1033,13 +924,14 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
}
if (pPage->vLen == TDB_VARIANT_LEN) {
ASSERT(leaf);
nHeader += tdbGetVarInt(pCell + nHeader, &(pDecoder->vLen));
} else {
pDecoder->vLen = pPage->vLen;
}
// 2. Decode payload part
ret = tdbBtreeDecodePayload(pPage, pCell + nHeader, pDecoder);
ret = tdbBtreeDecodePayload(pPage, pCell, nHeader, pDecoder);
if (ret < 0) {
return -1;
}
@ -1048,16 +940,14 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
}
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) {
u8 flags;
u8 isLeaf;
u8 leaf;
int szCell;
int kLen = 0, vLen = 0;
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
isLeaf = TDB_BTREE_PAGE_IS_LEAF(flags);
leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
szCell = 0;
if (!isLeaf) {
if (!leaf) {
szCell += sizeof(SPgno);
}
@ -1067,21 +957,28 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) {
kLen = pPage->kLen;
}
if (isLeaf) {
if (pPage->vLen == TDB_VARIANT_LEN) {
szCell += tdbGetVarInt(pCell + szCell, &vLen);
} else {
vLen = pPage->vLen;
}
if (pPage->vLen == TDB_VARIANT_LEN) {
ASSERT(leaf);
szCell += tdbGetVarInt(pCell + szCell, &vLen);
} else if (leaf) {
vLen = pPage->vLen;
}
szCell = szCell + kLen + vLen;
return szCell;
if (szCell <= pPage->maxLocal) {
return szCell;
}
{
// TODO
ASSERT(0);
return 0;
}
}
// TDB_BTREE_CELL
#endif
// TDB_BTREE_CURSOR =====================
int tdbBtcOpen(SBTC *pBtc, SBTree *pBt) {
pBtc->pBt = pBt;
pBtc->iPage = -1;
@ -1095,7 +992,6 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
int ret;
SBTree *pBt;
SPager *pPager;
u8 flags;
SCell *pCell;
SPgno pgno;
@ -1110,23 +1006,43 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
return -1;
}
ASSERT(TDB_BTREE_PAGE_IS_ROOT(pBtc->pPage));
pBtc->iPage = 0;
pBtc->idx = 0;
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) > 0) {
pBtc->idx = 0;
} else {
// no any data, point to an invalid position
ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
pBtc->idx = -1;
return 0;
}
} else {
// move from a position
ASSERT(0);
int iPage = 0;
for (; iPage < pBtc->iPage; iPage++) {
ASSERT(pBtc->idxStack[iPage] >= 0);
if (pBtc->idxStack[iPage]) break;
}
// move upward
for (;;) {
if (pBtc->iPage == 0) {
pBtc->idx = 0;
break;
}
if (pBtc->iPage < iPage) break;
tdbBtcMoveUpward(pBtc);
}
}
// move downward
for (;;) {
flags = TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage);
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;
if (TDB_BTREE_PAGE_IS_LEAF(flags)) break;
pCell = tdbPageGetCell(pBtc->pPage, 0);
pgno = *(SPgno *)pCell;
ret = tdbBtcMoveDownward(pBtc, pgno);
ret = tdbBtcMoveDownward(pBtc);
if (ret < 0) {
ASSERT(0);
return -1;
@ -1142,7 +1058,6 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
int ret;
SBTree *pBt;
SPager *pPager;
u8 flags;
SPgno pgno;
pBt = pBtc->pBt;
@ -1164,18 +1079,15 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
// move downward
for (;;) {
flags = TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage);
if (TDB_BTREE_PAGE_IS_LEAF(flags)) {
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
// TODO: handle empty case
ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) > 0);
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1;
break;
} else {
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno;
ret = tdbBtcMoveDownward(pBtc, pgno);
ret = tdbBtcMoveDownward(pBtc);
if (ret < 0) {
ASSERT(0);
return -1;
@ -1186,11 +1098,6 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
return 0;
}
int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen) {
// TODO
return 0;
}
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
SCell *pCell;
SCellDecoder cd;
@ -1233,11 +1140,9 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
static int tdbBtcMoveToNext(SBTC *pBtc) {
int nCells;
SPgno pgno;
SCell *pCell;
u8 flags;
ASSERT(TDB_BTREE_PAGE_IS_LEAF(TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage)));
ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
if (pBtc->idx < 0) return -1;
@ -1270,18 +1175,11 @@ static int tdbBtcMoveToNext(SBTC *pBtc) {
// Move downward
for (;;) {
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
if (pBtc->idx < nCells) {
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
pgno = *(SPgno *)pCell;
} else {
pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno;
}
tdbBtcMoveDownward(pBtc, pgno);
tdbBtcMoveDownward(pBtc);
pBtc->idx = 0;
flags = TDB_BTREE_PAGE_GET_FLAGS(pBtc->pPage);
if (TDB_BTREE_PAGE_IS_LEAF(flags)) {
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
break;
}
}
@ -1289,13 +1187,20 @@ static int tdbBtcMoveToNext(SBTC *pBtc) {
return 0;
}
int tdbBtcClose(SBTC *pBtc) {
// TODO
return 0;
}
static int tdbBtcMoveDownward(SBTC *pBtc) {
int ret;
SPgno pgno;
SCell *pCell;
static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno) {
int ret;
ASSERT(pBtc->idx >= 0);
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
pgno = ((SPgno *)pCell)[0];
} else {
pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno;
}
pBtc->pgStack[pBtc->iPage] = pBtc->pPage;
pBtc->idxStack[pBtc->iPage] = pBtc->idx;
@ -1306,6 +1211,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno) {
ret = tdbPagerFetchPage(pBtc->pBt->pPager, pgno, &pBtc->pPage, tdbBtreeInitPage, pBtc->pBt);
if (ret < 0) {
ASSERT(0);
return -1;
}
return 0;
@ -1314,7 +1220,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno) {
static int tdbBtcMoveUpward(SBTC *pBtc) {
if (pBtc->iPage == 0) return -1;
// tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage);
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage);
pBtc->iPage--;
pBtc->pPage = pBtc->pgStack[pBtc->iPage];
@ -1323,6 +1229,117 @@ static int tdbBtcMoveUpward(SBTC *pBtc) {
return 0;
}
static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
int ret;
SBTree *pBt;
SPager *pPager;
pBt = pBtc->pBt;
pPager = pBt->pPager;
if (pBtc->iPage < 0) {
ASSERT(pBtc->iPage == -1);
ASSERT(pBtc->idx == -1);
// Move from the root
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt);
if (ret < 0) {
ASSERT(0);
return -1;
}
pBtc->iPage = 0;
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) {
// Current page is empty
// ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pBtc->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF));
return 0;
}
for (;;) {
int lidx, ridx, midx, c, nCells;
SCell *pCell;
SPage *pPage;
SCellDecoder cd = {0};
pPage = pBtc->pPage;
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
lidx = 0;
ridx = nCells - 1;
ASSERT(nCells > 0);
for (;;) {
if (lidx > ridx) break;
midx = (lidx + ridx) >> 1;
pCell = tdbPageGetCell(pPage, midx);
ret = tdbBtreeDecodeCell(pPage, pCell, &cd);
if (ret < 0) {
// TODO: handle error
ASSERT(0);
return -1;
}
// Compare the key values
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c < 0) {
/* input-key < cell-key */
ridx = midx - 1;
} else if (c > 0) {
/* input-key > cell-key */
lidx = midx + 1;
} else {
/* input-key == cell-key */
break;
}
}
// Move downward or break
u8 leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
if (leaf) {
pBtc->idx = midx;
*pCRst = c;
break;
} else {
if (c <= 0) {
pBtc->idx = midx;
} else {
pBtc->idx = midx + 1;
}
tdbBtcMoveDownward(pBtc);
}
}
} else {
// TODO: Move the cursor from a some position instead of a clear state
ASSERT(0);
}
return 0;
}
int tdbBtcClose(SBTC *pBtc) {
if (pBtc->iPage < 0) return 0;
for (;;) {
ASSERT(pBtc->pPage);
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage);
pBtc->iPage--;
if (pBtc->iPage < 0) break;
pBtc->pPage = pBtc->pgStack[pBtc->iPage];
pBtc->idx = pBtc->idxStack[pBtc->iPage];
}
return 0;
}
// TDB_BTREE_CURSOR
// TDB_BTREE_DEBUG =====================
#ifndef NODEBUG
typedef struct {
SPgno pgno;
@ -1336,17 +1353,14 @@ typedef struct {
SBtPageInfo btPageInfos[20];
void tdbBtPageInfo(SPage *pPage, int idx) {
u8 flags;
SBtPageInfo *pBtPageInfo;
pBtPageInfo = btPageInfos + idx;
pBtPageInfo->pgno = TDB_PAGE_PGNO(pPage);
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
pBtPageInfo->root = TDB_BTREE_PAGE_IS_ROOT(flags);
pBtPageInfo->leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
pBtPageInfo->root = TDB_BTREE_PAGE_IS_ROOT(pPage);
pBtPageInfo->leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
pBtPageInfo->rChild = 0;
if (!pBtPageInfo->leaf) {
@ -1356,4 +1370,5 @@ void tdbBtPageInfo(SPage *pPage, int idx) {
pBtPageInfo->nCells = TDB_PAGE_TOTAL_CELLS(pPage) - pPage->nOverflow;
pBtPageInfo->nOvfl = pPage->nOverflow;
}
#endif
#endif
// TDB_BTREE_DEBUG

View File

@ -35,7 +35,6 @@ struct SBTC {
int idx;
int idxStack[BTREE_MAX_DEPTH + 1];
SPage *pgStack[BTREE_MAX_DEPTH + 1];
void *pBuf;
};
// SBTree
@ -49,7 +48,6 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
int tdbBtcOpen(SBTC *pCur, SBTree *pBt);
int tdbBtcMoveToFirst(SBTC *pBtc);
int tdbBtcMoveToLast(SBTC *pBtc);
int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen);
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbBtcClose(SBTC *pBtc);

View File

@ -64,7 +64,7 @@ typedef TdThreadSpinlock tdb_spinlock_t;
#define tdbSpinlockDestroy taosThreadSpinDestroy
#define tdbSpinlockLock taosThreadSpinLock
#define tdbSpinlockUnlock taosThreadSpinUnlock
#define tdbSpinlockTrylock pthread_spin_trylock
#define tdbSpinlockTrylock taosThreadSpinTrylock
/* mutex lock */
typedef TdThreadMutex tdb_mutex_t;