Merge pull request #11185 from taosdata/feature/tdb

Feature/tdb
This commit is contained in:
Hongze Cheng 2022-04-01 17:10:12 +08:00 committed by GitHub
commit 5021882e38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 573 additions and 347 deletions

View File

@ -29,7 +29,7 @@ struct SBTree {
int minLocal; int minLocal;
int maxLeaf; int maxLeaf;
int minLeaf; int minLeaf;
u8 *pTmp; void *pBuf;
}; };
#define TDB_BTREE_PAGE_COMMON_HDR u8 flags; #define TDB_BTREE_PAGE_COMMON_HDR u8 flags;
@ -101,7 +101,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S
// pBt->kcmpr // pBt->kcmpr
pBt->kcmpr = kcmpr ? kcmpr : tdbDefaultKeyCmprFn; pBt->kcmpr = kcmpr ? kcmpr : tdbDefaultKeyCmprFn;
// pBt->pageSize // pBt->pageSize
pBt->pageSize = tdbPagerGetPageSize(pPager); pBt->pageSize = pPager->pageSize;
// pBt->maxLocal // pBt->maxLocal
pBt->maxLocal = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr)) / 4; pBt->maxLocal = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr)) / 4;
// pBt->minLocal: Should not be allowed smaller than 15, which is [nPayload][nKey][nData] // pBt->minLocal: Should not be allowed smaller than 15, which is [nPayload][nKey][nData]
@ -127,132 +127,144 @@ int tdbBtreeClose(SBTree *pBt) {
return 0; return 0;
} }
int tdbBtCursorInsert(SBTC *pBtc, const void *pKey, int kLen, const void *pVal, int vLen) { int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen) {
SBTC btc;
SCell *pCell;
void *pBuf;
int szCell;
int szBuf;
int ret; int ret;
int idx; int idx;
SPager *pPager; int c;
SCell *pCell;
int szCell;
int cret;
SBTree *pBt;
ret = tdbBtcMoveTo(pBtc, pKey, kLen, &cret); tdbBtcOpen(&btc, pBt);
// move to the position to insert
ret = tdbBtcMoveTo(&btc, pKey, kLen, &c);
if (ret < 0) { if (ret < 0) {
// TODO: handle error tdbBtcClose(&btc);
ASSERT(0);
return -1; return -1;
} }
if (pBtc->idx == -1) { if (btc.idx == -1) {
ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0);
idx = 0; idx = 0;
} else { } else {
if (cret > 0) { if (c > 0) {
idx = pBtc->idx + 1; idx = btc.idx + 1;
} else if (cret < 0) { } else if (c < 0) {
idx = pBtc->idx; idx = btc.idx;
} else { } else {
/* TODO */ // TDB does NOT allow same key
tdbBtcClose(&btc);
ASSERT(0); ASSERT(0);
}
}
// TODO: refact code here
pBt = pBtc->pBt;
if (!pBt->pTmp) {
pBt->pTmp = (u8 *)tdbOsMalloc(pBt->pageSize);
if (pBt->pTmp == NULL) {
return -1; return -1;
} }
} }
pCell = pBt->pTmp; // make sure enough space to hold the cell
szBuf = kLen + vLen + 14;
pBuf = TDB_REALLOC(pBt->pBuf, pBt->pageSize > szBuf ? szBuf : pBt->pageSize);
if (pBuf == NULL) {
tdbBtcClose(&btc);
ASSERT(0);
return -1;
}
pBt->pBuf = pBuf;
pCell = (SCell *)pBt->pBuf;
// Encode the cell // encode cell
ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pVal, vLen, pCell, &szCell); ret = tdbBtreeEncodeCell(btc.pPage, pKey, kLen, pVal, vLen, pCell, &szCell);
if (ret < 0) { if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0);
return -1; return -1;
} }
// Insert the cell to the index // mark the page dirty
ret = tdbPageInsertCell(pBtc->pPage, idx, pCell, szCell, 0); ret = tdbPagerWrite(pBt->pPager, btc.pPage);
if (ret < 0) { if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0);
return -1; return -1;
} }
// If page is overflow, balance the tree // insert the cell
if (pBtc->pPage->nOverflow > 0) { ret = tdbPageInsertCell(btc.pPage, idx, pCell, szCell, 0);
ret = tdbBtreeBalance(pBtc);
if (ret < 0) { if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0);
return -1;
}
// check if need balance
if (btc.pPage->nOverflow > 0) {
ret = tdbBtreeBalance(&btc);
if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0);
return -1; return -1;
} }
} }
tdbBtcClose(&btc);
return 0; return 0;
} }
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen) { int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen) {
SBTC btc; return tdbBtreePGet(pBt, pKey, kLen, NULL, NULL, ppVal, vLen);
SCell *pCell;
int cret;
void *pVal;
SCellDecoder cd;
tdbBtcOpen(&btc, pBt);
tdbBtcMoveTo(&btc, pKey, kLen, &cret);
if (cret) {
return cret;
}
pCell = tdbPageGetCell(btc.pPage, btc.idx);
tdbBtreeDecodeCell(btc.pPage, pCell, &cd);
*vLen = cd.vLen;
pVal = TDB_REALLOC(*ppVal, *vLen);
if (pVal == NULL) {
return -1;
}
*ppVal = pVal;
memcpy(*ppVal, cd.pVal, cd.vLen);
return 0;
} }
int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) { int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) {
SBTC btc; SBTC btc;
SCell *pCell; SCell *pCell;
int cret; int cret;
void *pTKey; int ret;
void *pTVal; void *pTKey = NULL;
void *pTVal = NULL;
SCellDecoder cd; SCellDecoder cd;
tdbBtcOpen(&btc, pBt); tdbBtcOpen(&btc, pBt);
tdbBtcMoveTo(&btc, pKey, kLen, &cret); ret = tdbBtcMoveTo(&btc, pKey, kLen, &cret);
if (ret < 0) {
tdbBtcClose(&btc);
ASSERT(0);
}
if (cret) { if (cret) {
return cret; tdbBtcClose(&btc);
return -1;
} }
pCell = tdbPageGetCell(btc.pPage, btc.idx); pCell = tdbPageGetCell(btc.pPage, btc.idx);
tdbBtreeDecodeCell(btc.pPage, pCell, &cd); tdbBtreeDecodeCell(btc.pPage, pCell, &cd);
if (ppKey) {
pTKey = TDB_REALLOC(*ppKey, cd.kLen); pTKey = TDB_REALLOC(*ppKey, cd.kLen);
pTVal = TDB_REALLOC(*ppVal, cd.vLen); if (pTKey == NULL) {
tdbBtcClose(&btc);
if (pTKey == NULL || pTVal == NULL) { ASSERT(0);
TDB_FREE(pTKey); return -1;
TDB_FREE(pTVal); }
*ppKey = pTKey;
*pkLen = cd.kLen;
memcpy(*ppKey, cd.pKey, cd.kLen);
} }
*ppKey = pTKey; pTVal = TDB_REALLOC(*ppVal, cd.vLen);
if (pTVal == NULL) {
tdbBtcClose(&btc);
ASSERT(0);
return -1;
}
*ppVal = pTVal; *ppVal = pTVal;
*pkLen = cd.kLen;
*vLen = cd.vLen; *vLen = cd.vLen;
memcpy(*ppKey, cd.pKey, cd.kLen);
memcpy(*ppVal, cd.pVal, cd.vLen); memcpy(*ppVal, cd.pVal, cd.vLen);
tdbBtcClose(&btc);
return 0; return 0;
} }
@ -300,7 +312,8 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
return -1; return -1;
} }
// TODO: Unref the page // TODO: here still has problem
tdbPagerReturnPage(pBt->pPager, pPage);
ASSERT(pgno != 0); ASSERT(pgno != 0);
pBt->root = pgno; pBt->root = pgno;
@ -371,17 +384,7 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) {
return 0; return 0;
} }
#ifndef TDB_BTREE_BALANCE // TDB_BTREE_BALANCE =====================
typedef struct {
SBTree *pBt;
SPage *pParent;
int idx;
i8 nOld;
SPage *pOldPages[3];
i8 nNewPages;
SPage *pNewPages[5];
} SBtreeBalanceHelper;
static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) { static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
SPager *pPager; SPager *pPager;
SPage *pChild; SPage *pChild;
@ -408,6 +411,13 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
((SIntHdr *)pChild->pData)->pgno = ((SIntHdr *)(pRoot->pData))->pgno; ((SIntHdr *)pChild->pData)->pgno = ((SIntHdr *)(pRoot->pData))->pgno;
} }
ret = tdbPagerWrite(pPager, pChild);
if (ret < 0) {
// TODO
ASSERT(0);
return 0;
}
// Copy the root page content to the child page // Copy the root page content to the child page
tdbPageCopy(pRoot, pChild); tdbPageCopy(pRoot, pChild);
@ -472,6 +482,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
ret = tdbPagerWrite(pBt->pPager, pOlds[i]);
if (ret < 0) {
// TODO
ASSERT(0);
return -1;
}
} }
// copy the parent key out if child pages are not leaf page // copy the parent key out if child pages are not leaf page
childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]); childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]);
@ -492,6 +509,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
} }
rPgno = ((SIntHdr *)pOlds[nOlds - 1]->pData)->pgno; rPgno = ((SIntHdr *)pOlds[nOlds - 1]->pData)->pgno;
} }
ret = tdbPagerWrite(pBt->pPager, pParent);
if (ret < 0) {
// TODO
ASSERT(0);
return -1;
}
// drop the cells on parent page // drop the cells on parent page
for (int i = 0; i < nOlds; i++) { for (int i = 0; i < nOlds; i++) {
nCells = TDB_PAGE_TOTAL_CELLS(pParent); nCells = TDB_PAGE_TOTAL_CELLS(pParent);
@ -619,6 +644,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
} }
ret = tdbPagerWrite(pBt->pPager, pNews[iNew]);
if (ret < 0) {
// TODO
ASSERT(0);
return -1;
}
} }
} }
@ -732,14 +764,24 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
} }
} }
// TODO: here is not corrent for drop case
for (int i = 0; i < nNews; i++) {
if (i < nOlds) {
tdbPagerReturnPage(pBt->pPager, pOlds[i]);
} else {
tdbPagerReturnPage(pBt->pPager, pNews[i]);
}
}
return 0; return 0;
} }
static int tdbBtreeBalance(SBTC *pBtc) { static int tdbBtreeBalance(SBTC *pBtc) {
int iPage; int iPage;
int ret;
int nFree;
SPage *pParent; SPage *pParent;
SPage *pPage; SPage *pPage;
int ret;
u8 flags; u8 flags;
u8 leaf; u8 leaf;
u8 root; u8 root;
@ -750,10 +792,11 @@ static int tdbBtreeBalance(SBTC *pBtc) {
pPage = pBtc->pPage; pPage = pBtc->pPage;
leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
root = TDB_BTREE_PAGE_IS_ROOT(pPage); root = TDB_BTREE_PAGE_IS_ROOT(pPage);
nFree = TDB_PAGE_FREE_SIZE(pPage);
// when the page is not overflow and not too empty, the balance work // when the page is not overflow and not too empty, the balance work
// is finished. Just break out the balance loop. // is finished. Just break out the balance loop.
if (pPage->nOverflow == 0 /* TODO: && pPage->nFree <= */) { if (pPage->nOverflow == 0 && nFree < TDB_PAGE_USABLE_SIZE(pPage) * 2 / 3) {
break; break;
} }
@ -781,6 +824,8 @@ static int tdbBtreeBalance(SBTC *pBtc) {
return -1; return -1;
} }
tdbPagerReturnPage(pBtc->pBt->pPager, pBtc->pPage);
pBtc->iPage--; pBtc->iPage--;
pBtc->pPage = pBtc->pgStack[pBtc->iPage]; pBtc->pPage = pBtc->pgStack[pBtc->iPage];
} }
@ -788,7 +833,7 @@ static int tdbBtreeBalance(SBTC *pBtc) {
return 0; return 0;
} }
#endif // TDB_BTREE_BALANCE
// TDB_BTREE_CELL ===================== // TDB_BTREE_CELL =====================
static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const void *pKey, int kLen, const void *pVal, static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const void *pKey, int kLen, const void *pVal,
@ -1028,12 +1073,11 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
// move upward // move upward
for (;;) { for (;;) {
if (pBtc->iPage == 0) { if (pBtc->iPage == iPage) {
pBtc->idx = 0; pBtc->idx = 0;
break; break;
} }
if (pBtc->iPage < iPage) break;
tdbBtcMoveUpward(pBtc); tdbBtcMoveUpward(pBtc);
} }
} }
@ -1056,6 +1100,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
int tdbBtcMoveToLast(SBTC *pBtc) { int tdbBtcMoveToLast(SBTC *pBtc) {
int ret; int ret;
int nCells;
SBTree *pBt; SBTree *pBt;
SPager *pPager; SPager *pPager;
SPgno pgno; SPgno pgno;
@ -1071,27 +1116,56 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
return -1; return -1;
} }
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
pBtc->iPage = 0; pBtc->iPage = 0;
if (nCells > 0) {
pBtc->idx = TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage) ? nCells - 1 : nCells;
} else { } else {
// move from a position // no data at all, point to an invalid position
ASSERT(0); ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
pBtc->idx = -1;
return 0;
}
} else {
int iPage = 0;
// downward search
for (; iPage < pBtc->iPage; iPage++) {
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pBtc->pgStack[iPage]));
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pgStack[iPage]);
if (pBtc->idxStack[iPage] != nCells) break;
}
// move upward
for (;;) {
if (pBtc->iPage == iPage) {
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1;
} else {
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
}
break;
}
tdbBtcMoveUpward(pBtc);
}
} }
// move downward // move downward
for (;;) { for (;;) {
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) { if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;
// TODO: handle empty case
ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) > 0);
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1;
break;
} else {
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
ret = tdbBtcMoveDownward(pBtc); ret = tdbBtcMoveDownward(pBtc);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
pBtc->idx = nCells - 1;
} else {
pBtc->idx = nCells;
} }
} }
@ -1104,6 +1178,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
void *pKey, *pVal; void *pKey, *pVal;
int ret; int ret;
// current cursor points to an invalid position
if (pBtc->idx < 0) { if (pBtc->idx < 0) {
return -1; return -1;
} }
@ -1134,12 +1209,17 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
memcpy(pVal, cd.pVal, cd.vLen); memcpy(pVal, cd.pVal, cd.vLen);
ret = tdbBtcMoveToNext(pBtc); ret = tdbBtcMoveToNext(pBtc);
if (ret < 0) {
ASSERT(0);
return -1;
}
return 0; return 0;
} }
static int tdbBtcMoveToNext(SBTC *pBtc) { static int tdbBtcMoveToNext(SBTC *pBtc) {
int nCells; int nCells;
int ret;
SCell *pCell; SCell *pCell;
ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)); ASSERT(TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
@ -1151,37 +1231,33 @@ static int tdbBtcMoveToNext(SBTC *pBtc) {
return 0; return 0;
} }
// move upward
for (;;) {
if (pBtc->iPage == 0) { if (pBtc->iPage == 0) {
pBtc->idx = -1; pBtc->idx = -1;
return 0; return 0;
} }
// Move upward
for (;;) {
tdbBtcMoveUpward(pBtc); tdbBtcMoveUpward(pBtc);
pBtc->idx++; pBtc->idx++;
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage); ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage));
if (pBtc->idx <= nCells) { if (pBtc->idx <= TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
break; break;
} }
if (pBtc->iPage == 0) {
pBtc->idx = -1;
return 0;
}
} }
// Move downward // move downward
for (;;) { for (;;) {
nCells = TDB_PAGE_TOTAL_CELLS(pBtc->pPage); if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;
tdbBtcMoveDownward(pBtc); ret = tdbBtcMoveDownward(pBtc);
pBtc->idx = 0; if (ret < 0) {
ASSERT(0);
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) { return -1;
break;
} }
pBtc->idx = 0;
} }
return 0; return 0;
@ -1231,36 +1307,70 @@ static int tdbBtcMoveUpward(SBTC *pBtc) {
static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
int ret; int ret;
int nCells;
int c;
SBTree *pBt; SBTree *pBt;
SCell *pCell;
SPager *pPager; SPager *pPager;
SCellDecoder cd = {0};
pBt = pBtc->pBt; pBt = pBtc->pBt;
pPager = pBt->pPager; pPager = pBt->pPager;
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
ASSERT(pBtc->iPage == -1); // move from a clear cursor
ASSERT(pBtc->idx == -1);
// Move from the root
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt);
if (ret < 0) { if (ret < 0) {
// TODO
ASSERT(0); ASSERT(0);
return -1;
}
pBtc->iPage = 0;
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) {
// Current page is empty
// ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pBtc->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF));
return 0; return 0;
} }
for (;;) { pBtc->iPage = 0;
int lidx, ridx, midx, c, nCells; pBtc->idx = -1;
SCell *pCell; // for empty tree, just return with an invalid position
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) return 0;
} else {
SPage *pPage;
int idx;
int iPage = 0;
// downward search
for (; iPage < pBtc->iPage; iPage++) {
pPage = pBtc->pgStack[iPage];
idx = pBtc->idxStack[iPage];
nCells = TDB_PAGE_TOTAL_CELLS(pPage);
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pPage));
// check if key <= current position
if (idx < nCells) {
pCell = tdbPageGetCell(pPage, idx);
tdbBtreeDecodeCell(pPage, pCell, &cd);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c > 0) break;
}
// check if key > current - 1 position
if (idx > 0) {
pCell = tdbPageGetCell(pPage, idx - 1);
tdbBtreeDecodeCell(pPage, pCell, &cd);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c <= 0) break;
}
}
// move upward
for (;;) {
if (pBtc->iPage == iPage) break;
tdbBtcMoveUpward(pBtc);
}
}
// search downward to the leaf
for (;;) {
int lidx, ridx, midx;
SPage *pPage; SPage *pPage;
SCellDecoder cd = {0};
pPage = pBtc->pPage; pPage = pBtc->pPage;
nCells = TDB_PAGE_TOTAL_CELLS(pPage); nCells = TDB_PAGE_TOTAL_CELLS(pPage);
@ -1268,7 +1378,33 @@ static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
ridx = nCells - 1; ridx = nCells - 1;
ASSERT(nCells > 0); ASSERT(nCells > 0);
ASSERT(pBtc->idx == -1);
// compare first cell
midx = lidx;
pCell = tdbPageGetCell(pPage, midx);
tdbBtreeDecodeCell(pPage, pCell, &cd);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c <= 0) {
ridx = lidx - 1;
} else {
lidx = lidx + 1;
}
// compare last cell
if (lidx <= ridx) {
midx = ridx;
pCell = tdbPageGetCell(pPage, midx);
tdbBtreeDecodeCell(pPage, pCell, &cd);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c >= 0) {
lidx = ridx + 1;
} else {
ridx = ridx - 1;
}
}
// binary search
for (;;) { for (;;) {
if (lidx > ridx) break; if (lidx > ridx) break;
@ -1285,20 +1421,19 @@ static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
// Compare the key values // Compare the key values
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen); c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c < 0) { if (c < 0) {
/* input-key < cell-key */ // pKey < cd.pKey
ridx = midx - 1; ridx = midx - 1;
} else if (c > 0) { } else if (c > 0) {
/* input-key > cell-key */ // pKey > cd.pKey
lidx = midx + 1; lidx = midx + 1;
} else { } else {
/* input-key == cell-key */ // pKey == cd.pKey
break; break;
} }
} }
// Move downward or break // keep search downward or break
u8 leaf = TDB_BTREE_PAGE_IS_LEAF(pPage); if (TDB_BTREE_PAGE_IS_LEAF(pPage)) {
if (leaf) {
pBtc->idx = midx; pBtc->idx = midx;
*pCRst = c; *pCRst = c;
break; break;
@ -1312,11 +1447,6 @@ static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
} }
} }
} else {
// TODO: Move the cursor from a some position instead of a clear state
ASSERT(0);
}
return 0; return 0;
} }

View File

@ -49,6 +49,8 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, FKeyComparator keyCmprF
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
tdbEnvAddPager(pEnv, pPager);
} }
ASSERT(pPager != NULL); ASSERT(pPager != NULL);
@ -74,22 +76,7 @@ int tdbDbDrop(TDB *pDb) {
} }
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen) { int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen) {
SBTC btc; return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen);
SBTC *pCur;
int ret;
pCur = &btc;
ret = tdbBtcOpen(pCur, pDb->pBt);
if (ret < 0) {
return -1;
}
ret = tdbBtCursorInsert(pCur, pKey, keyLen, pVal, valLen);
if (ret < 0) {
return -1;
}
return 0;
} }
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) { int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {

View File

@ -19,6 +19,7 @@ int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv) {
TENV *pEnv; TENV *pEnv;
int dsize; int dsize;
int zsize; int zsize;
int tsize;
u8 *pPtr; u8 *pPtr;
int ret; int ret;
@ -53,6 +54,14 @@ int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv) {
return -1; return -1;
} }
pEnv->nPgrHash = 8;
tsize = sizeof(SPager *) * pEnv->nPgrHash;
pEnv->pgrHash = TDB_REALLOC(pEnv->pgrHash, tsize);
if (pEnv->pgrHash == NULL) {
return -1;
}
memset(pEnv->pgrHash, 0, tsize);
mkdir(rootDir, 0755); mkdir(rootDir, 0755);
*ppEnv = pEnv; *ppEnv = pEnv;
@ -64,7 +73,99 @@ int tdbEnvClose(TENV *pEnv) {
return 0; return 0;
} }
SPager *tdbEnvGetPager(TENV *pEnv, const char *fname) { int tdbBegin(TENV *pEnv) {
// TODO SPager *pPager;
return NULL; int ret;
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerBegin(pPager);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
return 0;
}
int tdbCommit(TENV *pEnv) {
SPager *pPager;
int ret;
for (pPager = pEnv->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerCommit(pPager);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
return 0;
}
int tdbRollback(TENV *pEnv) {
ASSERT(0);
return 0;
}
SPager *tdbEnvGetPager(TENV *pEnv, const char *fname) {
u32 hash;
SPager **ppPager;
hash = tdbCstringHash(fname);
ppPager = &pEnv->pgrHash[hash % pEnv->nPgrHash];
for (; *ppPager && (strcmp(fname, (*ppPager)->dbFileName) != 0); ppPager = &((*ppPager)->pHashNext)) {
}
return *ppPager;
}
void tdbEnvAddPager(TENV *pEnv, SPager *pPager) {
u32 hash;
SPager **ppPager;
// rehash if neccessary
if (pEnv->nPager + 1 > pEnv->nPgrHash) {
// TODO
}
// add to list
pPager->pNext = pEnv->pgrList;
pEnv->pgrList = pPager;
// add to hash
hash = tdbCstringHash(pPager->dbFileName);
ppPager = &pEnv->pgrHash[hash % pEnv->nPgrHash];
pPager->pHashNext = *ppPager;
*ppPager = pPager;
// increase the counter
pEnv->nPager++;
}
void tdbEnvRemovePager(TENV *pEnv, SPager *pPager) {
u32 hash;
SPager **ppPager;
// remove from the list
for (ppPager = &pEnv->pgrList; *ppPager && (*ppPager != pPager); ppPager = &((*ppPager)->pNext)) {
}
ASSERT(*ppPager == pPager);
*ppPager = pPager->pNext;
// remove from hash
hash = tdbCstringHash(pPager->dbFileName);
ppPager = &pEnv->pgrHash[hash % pEnv->nPgrHash];
for (; *ppPager && *ppPager != pPager; ppPager = &((*ppPager)->pHashNext)) {
}
ASSERT(*ppPager == pPager);
*ppPager = pPager->pNext;
// decrease the counter
pEnv->nPager--;
// rehash if necessary
if (pEnv->nPgrHash > 8 && pEnv->nPager < pEnv->nPgrHash / 2) {
// TODO
}
} }

View File

@ -34,18 +34,6 @@ struct SPCache {
}) })
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL) #define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
// For page ref
#define TDB_INIT_PAGE_REF(pPage) ((pPage)->nRef = 0)
#if 0
#define TDB_REF_PAGE(pPage) (++(pPage)->nRef)
#define TDB_UNREF_PAGE(pPage) (--(pPage)->nRef)
#define TDB_GET_PAGE_REF(pPage) ((pPage)->nRef)
#else
#define TDB_REF_PAGE(pPage) atomic_add_fetch_32(&((pPage)->nRef), 1)
#define TDB_UNREF_PAGE(pPage) atomic_sub_fetch_32(&((pPage)->nRef), 1)
#define TDB_GET_PAGE_REF(pPage) atomic_load_32(&((pPage)->nRef))
#endif
static int tdbPCacheOpenImpl(SPCache *pCache); static int tdbPCacheOpenImpl(SPCache *pCache);
static void tdbPCacheInitLock(SPCache *pCache); static void tdbPCacheInitLock(SPCache *pCache);
static void tdbPCacheClearLock(SPCache *pCache); static void tdbPCacheClearLock(SPCache *pCache);
@ -107,12 +95,7 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage) {
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
if (nRef == 0) { if (nRef == 0) {
if (1 /*TODO: page still clean*/) {
tdbPCacheUnpinPage(pCache, pPage); tdbPCacheUnpinPage(pCache, pPage);
} else {
// TODO
ASSERT(0);
}
} }
} }
@ -192,6 +175,8 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
tdbPCacheLock(pCache); tdbPCacheLock(pCache);
ASSERT(!pPage->isDirty);
nRef = TDB_GET_PAGE_REF(pPage); nRef = TDB_GET_PAGE_REF(pPage);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
if (nRef == 0) { if (nRef == 0) {

View File

@ -15,20 +15,6 @@
#include "tdbInt.h" #include "tdbInt.h"
struct SPager {
char *dbFileName;
char *jFileName;
int pageSize;
uint8_t fid[TDB_FILE_ID_LEN];
tdb_fd_t fd;
tdb_fd_t jfd;
SPCache *pCache;
SPgno dbFileSize;
SPgno dbOrigSize;
SPage *pDirty;
u8 inTran;
};
typedef struct __attribute__((__packed__)) { typedef struct __attribute__((__packed__)) {
u8 hdrString[16]; u8 hdrString[16];
u16 pageSize; u16 pageSize;
@ -41,9 +27,8 @@ TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct")
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL) #define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
static int tdbPagerReadPage(SPager *pPager, SPage *pPage);
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno); static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg); static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage);
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
@ -132,25 +117,35 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
int tdbPagerWrite(SPager *pPager, SPage *pPage) { int tdbPagerWrite(SPager *pPager, SPage *pPage) {
int ret; int ret;
SPage **ppPage;
ASSERT(pPager->inTran);
#if 0
if (pPager->inTran == 0) { if (pPager->inTran == 0) {
ret = tdbPagerBegin(pPager); ret = tdbPagerBegin(pPager);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
} }
#endif
if (pPage->isDirty) return 0; if (pPage->isDirty) return 0;
// ref page one more time so the page will not be release
TDB_REF_PAGE(pPage);
// Set page as dirty // Set page as dirty
pPage->isDirty = 1; pPage->isDirty = 1;
// Add page to dirty list // Add page to dirty list(TODO: NOT use O(n^2) algorithm)
// TODO: sort the list according to the page number for (ppPage = &pPager->pDirty; (*ppPage) && TDB_PAGE_PGNO(*ppPage) < TDB_PAGE_PGNO(pPage);
pPage->pDirtyNext = pPager->pDirty; ppPage = &((*ppPage)->pDirtyNext)) {
pPager->pDirty = pPage; }
ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
pPage->pDirtyNext = *ppPage;
*ppPage = pPage;
// Write page to journal // Write page to journal if neccessary
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) { if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
ret = tdbPagerWritePageToJournal(pPager, pPage); ret = tdbPagerWritePageToJournal(pPager, pPage);
if (ret < 0) { if (ret < 0) {
@ -184,54 +179,46 @@ int tdbPagerCommit(SPager *pPager) {
SPage *pPage; SPage *pPage;
int ret; int ret;
// Begin commit // sync the journal file
{ ret = tdbOsFSync(pPager->jfd);
// TODO: Sync the journal file (Here or when write ?) if (ret < 0) {
// TODO
ASSERT(0);
return 0;
} }
for (;;) { // loop to write the dirty pages to file
pPage = pPager->pDirty; for (pPage = pPager->pDirty; pPage; pPage = pPage->pDirtyNext) {
// TODO: update the page footer
if (pPage == NULL) break;
ret = tdbPagerWritePageToDB(pPager, pPage); ret = tdbPagerWritePageToDB(pPager, pPage);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
}
// release the page
for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
pPager->pDirty = pPage->pDirtyNext; pPager->pDirty = pPage->pDirtyNext;
pPage->pDirtyNext = NULL; pPage->pDirtyNext = NULL;
// TODO: release the page pPage->isDirty = 0;
tdbPCacheRelease(pPager->pCache, pPage);
} }
// sync the db file
tdbOsFSync(pPager->fd); tdbOsFSync(pPager->fd);
// remote the journal file
tdbOsClose(pPager->jfd); tdbOsClose(pPager->jfd);
tdbOsRemove(pPager->jFileName); tdbOsRemove(pPager->jFileName);
// pPager->jfd = -1; pPager->dbOrigSize = pPager->dbFileSize;
pPager->inTran = 0;
return 0; return 0;
} }
static int tdbPagerReadPage(SPager *pPager, SPage *pPage) {
i64 offset;
int ret;
ASSERT(memcmp(pPager->fid, pPage->pgid.fileid, TDB_FILE_ID_LEN) == 0);
offset = (pPage->pgid.pgno - 1) * (i64)(pPager->pageSize);
ret = tdbOsPRead(pPager->fd, pPage->pData, pPager->pageSize, offset);
if (ret < 0) {
// TODO: handle error
return -1;
}
return 0;
}
int tdbPagerGetPageSize(SPager *pPager) { return pPager->pageSize; }
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) { int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg) {
SPage *pPage; SPage *pPage;
SPgid pgid; SPgid pgid;
@ -247,7 +234,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
// Initialize the page if need // Initialize the page if need
if (!TDB_PAGE_INITIALIZED(pPage)) { if (!TDB_PAGE_INITIALIZED(pPage)) {
ret = tdbPagerInitPage(pPager, pPage, initPage, arg); ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 1);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -284,7 +271,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
ASSERT(!TDB_PAGE_INITIALIZED(pPage)); ASSERT(!TDB_PAGE_INITIALIZED(pPage));
// Initialize the page if need // Initialize the page if need
ret = tdbPagerInitPage(pPager, pPage, initPage, arg); ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 0);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -332,10 +319,11 @@ static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
return 0; return 0;
} }
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg) { static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage) {
int ret; int ret;
int lcode; int lcode;
int nLoops; int nLoops;
i64 nRead;
lcode = TDB_TRY_LOCK_PAGE(pPage); lcode = TDB_TRY_LOCK_PAGE(pPage);
if (lcode == P_LOCK_SUCC) { if (lcode == P_LOCK_SUCC) {
@ -344,6 +332,19 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
return 0; return 0;
} }
if (loadPage) {
nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * TDB_PAGE_PGNO(pPage));
if (nRead < 0) {
// TODO
ASSERT(0);
return -1;
} else if (nRead < pPage->pageSize) {
// TODO
ASSERT(0);
return -1;
}
}
ret = (*initPage)(pPage, arg); ret = (*initPage)(pPage, arg);
if (ret < 0) { if (ret < 0) {
TDB_UNLOCK_PAGE(pPage); TDB_UNLOCK_PAGE(pPage);

View File

@ -15,17 +15,29 @@
#include "tdbInt.h" #include "tdbInt.h"
int tdbTxnBegin(TENV *pEnv) { // int tdbTxnBegin(TENV *pEnv) {
// TODO // // TODO
return 0; // return 0;
} // }
int tdbTxnCommit(TENV *pEnv) { // int tdbTxnCommit(TENV *pEnv) {
// TODO // SPager *pPager = NULL;
return 0; // int ret;
}
int tdbTxnRollback(TENV *pEnv) { // for (;;) {
// TODO // break;
return 0; // ret = tdbPagerCommit(pPager);
} // if (ret < 0) {
// ASSERT(0);
// return -1;
// }
// }
// // TODO
// return 0;
// }
// int tdbTxnRollback(TENV *pEnv) {
// // TODO
// return 0;
// }

View File

@ -40,7 +40,7 @@ struct SBTC {
// SBTree // SBTree
int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, FKeyComparator kcmpr, SBTree **ppBt); int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, FKeyComparator kcmpr, SBTree **ppBt);
int tdbBtreeClose(SBTree *pBt); int tdbBtreeClose(SBTree *pBt);
int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal, int vLen); int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen);
int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);

View File

@ -25,11 +25,20 @@ typedef struct STEnv {
char *jfname; char *jfname;
int jfd; int jfd;
SPCache *pCache; SPCache *pCache;
SPager *pgrList;
int nPager;
int nPgrHash;
SPager **pgrHash;
} TENV; } TENV;
int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv); int tdbEnvOpen(const char *rootDir, int pageSize, int cacheSize, TENV **ppEnv);
int tdbEnvClose(TENV *pEnv); int tdbEnvClose(TENV *pEnv);
int tdbBegin(TENV *pEnv);
int tdbCommit(TENV *pEnv);
int tdbRollback(TENV *pEnv);
void tdbEnvAddPager(TENV *pEnv, SPager *pPager);
void tdbEnvRemovePager(TENV *pEnv, SPager *pPager);
SPager *tdbEnvGetPager(TENV *pEnv, const char *fname); SPager *tdbEnvGetPager(TENV *pEnv, const char *fname);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -16,8 +16,6 @@
#ifndef _TD_TDB_INTERNAL_H_ #ifndef _TD_TDB_INTERNAL_H_
#define _TD_TDB_INTERNAL_H_ #define _TD_TDB_INTERNAL_H_
#include "os.h"
#include "tdb.h" #include "tdb.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -91,23 +89,6 @@ static FORCE_INLINE int tdbCmprPgId(const void *p1, const void *p2) {
// dbname // dbname
#define TDB_MAX_DBNAME_LEN 24 #define TDB_MAX_DBNAME_LEN 24
// tdb_log
#define tdbError(var)
#define TERR_A(val, op, flag) \
do { \
if (((val) = (op)) != 0) { \
goto flag; \
} \
} while (0)
#define TERR_B(val, op, flag) \
do { \
if (((val) = (op)) == NULL) { \
goto flag; \
} \
} while (0)
#define TDB_VARIANT_LEN ((int)-1) #define TDB_VARIANT_LEN ((int)-1)
typedef int (*FKeyComparator)(const void *pKey1, int kLen1, const void *pKey2, int kLen2); typedef int (*FKeyComparator)(const void *pKey1, int kLen1, const void *pKey2, int kLen2);

View File

@ -24,6 +24,8 @@ extern "C" {
#define TDB_FOR_TDENGINE #define TDB_FOR_TDENGINE
#ifdef TDB_FOR_TDENGINE #ifdef TDB_FOR_TDENGINE
#include "os.h"
#include "thash.h"
// For memory ----------------- // For memory -----------------
#define tdbOsMalloc taosMemoryMalloc #define tdbOsMalloc taosMemoryMalloc
@ -95,7 +97,11 @@ typedef int tdb_fd_t;
#define tdbOsOpen(PATH, OPTION, MODE) open((PATH), (OPTION), (MODE)) #define tdbOsOpen(PATH, OPTION, MODE) open((PATH), (OPTION), (MODE))
#define tdbOsClose close #define tdbOsClose(FD) \
do { \
close(FD); \
(FD) = -1; \
} while (0)
i64 tdbOsRead(tdb_fd_t fd, void *pData, i64 nBytes); i64 tdbOsRead(tdb_fd_t fd, void *pData, i64 nBytes);
i64 tdbOsPRead(tdb_fd_t fd, void *pData, i64 nBytes, i64 offset); i64 tdbOsPRead(tdb_fd_t fd, void *pData, i64 nBytes, i64 offset);

View File

@ -33,6 +33,18 @@ extern "C" {
SPager *pPager; \ SPager *pPager; \
SPgid pgid; SPgid pgid;
// For page ref
#define TDB_INIT_PAGE_REF(pPage) ((pPage)->nRef = 0)
#if 0
#define TDB_REF_PAGE(pPage) (++(pPage)->nRef)
#define TDB_UNREF_PAGE(pPage) (--(pPage)->nRef)
#define TDB_GET_PAGE_REF(pPage) ((pPage)->nRef)
#else
#define TDB_REF_PAGE(pPage) atomic_add_fetch_32(&((pPage)->nRef), 1)
#define TDB_UNREF_PAGE(pPage) atomic_sub_fetch_32(&((pPage)->nRef), 1)
#define TDB_GET_PAGE_REF(pPage) atomic_load_32(&((pPage)->nRef))
#endif
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache); int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
int tdbPCacheClose(SPCache *pCache); int tdbPCacheClose(SPCache *pCache);
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage); SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage);

View File

@ -100,6 +100,7 @@ struct SPage {
// APIs // APIs
#define TDB_PAGE_TOTAL_CELLS(pPage) ((pPage)->nOverflow + (pPage)->pPageMethods->getCellNum(pPage)) #define TDB_PAGE_TOTAL_CELLS(pPage) ((pPage)->nOverflow + (pPage)->pPageMethods->getCellNum(pPage))
#define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx) #define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx)
#define TDB_PAGE_FREE_SIZE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno) #define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell) + (pPage)->pPageMethods->szOffset) #define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell) + (pPage)->pPageMethods->szOffset)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset) #define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)

View File

@ -20,13 +20,28 @@
extern "C" { extern "C" {
#endif #endif
struct SPager {
char *dbFileName;
char *jFileName;
int pageSize;
uint8_t fid[TDB_FILE_ID_LEN];
tdb_fd_t fd;
tdb_fd_t jfd;
SPCache *pCache;
SPgno dbFileSize;
SPgno dbOrigSize;
SPage *pDirty;
u8 inTran;
SPager *pNext; // used by TENV
SPager *pHashNext; // used by TENV
};
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager); int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager); int tdbPagerClose(SPager *pPager);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate); int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
int tdbPagerWrite(SPager *pPager, SPage *pPage); int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager); int tdbPagerBegin(SPager *pPager);
int tdbPagerCommit(SPager *pPager); int tdbPagerCommit(SPager *pPager);
int tdbPagerGetPageSize(SPager *pPager);
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg); int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg); int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage); void tdbPagerReturnPage(SPager *pPager, SPage *pPage);

View File

@ -28,10 +28,6 @@ struct STxn {
void *xArg; void *xArg;
}; };
int tdbTxnBegin(TENV *pEnv);
int tdbTxnCommit(TENV *pEnv);
int tdbTxnRollback(TENV *pEnv);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -101,6 +101,8 @@ static inline int tdbGetVarInt(const u8 *p, int *v) {
return n; return n;
} }
static inline u32 tdbCstringHash(const char *s) { return MurmurHash3_32(s, strlen(s)); }
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -118,10 +118,10 @@ TEST(tdb_test, simple_test) {
TENV *pEnv; TENV *pEnv;
TDB *pDb; TDB *pDb;
FKeyComparator compFunc; FKeyComparator compFunc;
int nData = 1000000; int nData = 50000000;
// Open Env // Open Env
ret = tdbEnvOpen("tdb", 4096, 256000, &pEnv); ret = tdbEnvOpen("tdb", 4096, 64, &pEnv);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// Create a database // Create a database
@ -134,35 +134,22 @@ TEST(tdb_test, simple_test) {
char val[64]; char val[64];
{ // Insert some data { // Insert some data
int i = 1; for (int i = 1; i <= nData;) {
SPoolMem *pPool; tdbBegin(pEnv);
int memPoolCapacity = 16 * 1024;
pPool = openPool();
tdbTxnBegin(pEnv);
for (;;) {
if (i > nData) break;
for (int k = 0; k < 2000; k++) {
sprintf(key, "key%d", i); sprintf(key, "key%d", i);
sprintf(val, "value%d", i); sprintf(val, "value%d", i);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val)); ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val));
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
if (pPool->size >= memPoolCapacity) {
tdbTxnCommit(pEnv);
clearPool(pPool);
tdbTxnBegin(pEnv);
}
i++; i++;
} }
closePool(pPool); tdbCommit(pEnv);
} }
}
tdbCommit(pEnv);
{ // Query the data { // Query the data
void *pVal = NULL; void *pVal = NULL;
@ -173,6 +160,7 @@ TEST(tdb_test, simple_test) {
sprintf(val, "value%d", i); sprintf(val, "value%d", i);
ret = tdbDbGet(pDb, key, strlen(key), &pVal, &vLen); ret = tdbDbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, strlen(val)); GTEST_ASSERT_EQ(vLen, strlen(val));