refact TDB to support meta

This commit is contained in:
Hongze Cheng 2022-04-25 03:43:42 +00:00
parent 98fc272ac8
commit 13740e8988
6 changed files with 150 additions and 154 deletions

View File

@ -42,8 +42,7 @@ struct SBTree {
ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \ ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \
TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0)) TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0))
#pragma pack(push, 1)
#pragma pack(push,1)
typedef struct { typedef struct {
TDB_BTREE_PAGE_COMMON_HDR TDB_BTREE_PAGE_COMMON_HDR
} SLeafHdr; } SLeafHdr;
@ -71,8 +70,7 @@ typedef struct {
static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst); static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst);
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2); static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2);
static int tdbBtreeOpenImpl(SBTree *pBt); static int tdbBtreeOpenImpl(SBTree *pBt);
static int tdbBtreeZeroPage(SPage *pPage, void *arg); static int tdbBtreeInitPage(SPage *pPage, void *arg, int init);
static int tdbBtreeInitPage(SPage *pPage, void *arg);
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell, static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
int *szCell); int *szCell);
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder); static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder);
@ -312,75 +310,57 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
} }
// Try to create a new database // Try to create a new database
SBtreeInitPageArg zArg = {.flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF, .pBt = pBt}; ret = tdbPagerAllocPage(pBt->pPager, &pgno);
ret = tdbPagerNewPage(pBt->pPager, &pgno, &pPage, tdbBtreeZeroPage, &zArg, NULL);
if (ret < 0) { if (ret < 0) {
ASSERT(0);
return -1; return -1;
} }
// TODO: here still has problem
tdbPagerReturnPage(pBt->pPager, pPage, NULL);
ASSERT(pgno != 0); ASSERT(pgno != 0);
pBt->root = pgno; pBt->root = pgno;
return 0; return 0;
} }
static int tdbBtreeInitPage(SPage *pPage, void *arg) { static int tdbBtreeInitPage(SPage *pPage, void *arg, int init) {
SBTree *pBt; SBTree *pBt;
u8 flags; u8 flags;
u8 isLeaf;
pBt = (SBTree *)arg;
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
isLeaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
ASSERT(flags == TDB_BTREE_PAGE_GET_FLAGS(pPage));
tdbPageInit(pPage, isLeaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize);
TDB_BTREE_ASSERT_FLAG(flags);
if (isLeaf) {
pPage->kLen = pBt->keyLen;
pPage->vLen = pBt->valLen;
pPage->maxLocal = pBt->maxLeaf;
pPage->minLocal = pBt->minLeaf;
} else {
pPage->kLen = pBt->keyLen;
pPage->vLen = sizeof(SPgno);
pPage->maxLocal = pBt->maxLocal;
pPage->minLocal = pBt->minLocal;
}
return 0;
}
static int tdbBtreeZeroPage(SPage *pPage, void *arg) {
u8 flags;
SBTree *pBt;
u8 leaf; u8 leaf;
flags = ((SBtreeInitPageArg *)arg)->flags;
pBt = ((SBtreeInitPageArg *)arg)->pBt; pBt = ((SBtreeInitPageArg *)arg)->pBt;
leaf = flags & TDB_BTREE_LEAF;
tdbPageZero(pPage, leaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize); if (init) {
// init page
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
TDB_BTREE_ASSERT_FLAG(flags);
tdbPageInit(pPage, leaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize);
} else {
// zero page
flags = ((SBtreeInitPageArg *)arg)->flags;
leaf = flags & TDB_BTREE_LEAF;
TDB_BTREE_ASSERT_FLAG(flags);
tdbPageZero(pPage, leaf ? sizeof(SLeafHdr) : sizeof(SIntHdr), tdbBtreeCellSize);
if (leaf) {
SLeafHdr *pLeafHdr = (SLeafHdr *)(pPage->pData);
pLeafHdr->flags = flags;
} else {
SIntHdr *pIntHdr = (SIntHdr *)(pPage->pData);
pIntHdr->flags = flags;
pIntHdr->pgno = 0;
}
}
if (leaf) { if (leaf) {
SLeafHdr *pLeafHdr = (SLeafHdr *)(pPage->pData);
pLeafHdr->flags = flags;
pPage->kLen = pBt->keyLen; pPage->kLen = pBt->keyLen;
pPage->vLen = pBt->valLen; pPage->vLen = pBt->valLen;
pPage->maxLocal = pBt->maxLeaf; pPage->maxLocal = pBt->maxLeaf;
pPage->minLocal = pBt->minLeaf; pPage->minLocal = pBt->minLeaf;
} else { } else {
SIntHdr *pIntHdr = (SIntHdr *)(pPage->pData);
pIntHdr->flags = flags;
pIntHdr->pgno = 0;
pPage->kLen = pBt->keyLen; pPage->kLen = pBt->keyLen;
pPage->vLen = sizeof(SPgno); pPage->vLen = sizeof(SPgno);
pPage->maxLocal = pBt->maxLocal; pPage->maxLocal = pBt->maxLocal;
@ -405,10 +385,11 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN
flags = TDB_BTREE_PAGE_GET_FLAGS(pRoot); flags = TDB_BTREE_PAGE_GET_FLAGS(pRoot);
leaf = TDB_BTREE_PAGE_IS_LEAF(pRoot); leaf = TDB_BTREE_PAGE_IS_LEAF(pRoot);
// Allocate a new child page // allocate a new child page
pgnoChild = 0;
zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT); zArg.flags = TDB_FLAG_REMOVE(flags, TDB_BTREE_ROOT);
zArg.pBt = pBt; zArg.pBt = pBt;
ret = tdbPagerNewPage(pPager, &pgnoChild, &pChild, tdbBtreeZeroPage, &zArg, pTxn); ret = tdbPagerFetchPage(pPager, &pgnoChild, &pChild, tdbBtreeInitPage, &zArg, pTxn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -430,7 +411,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild, TXN
// Reinitialize the root page // Reinitialize the root page
zArg.flags = TDB_BTREE_ROOT; zArg.flags = TDB_BTREE_ROOT;
zArg.pBt = pBt; zArg.pBt = pBt;
ret = tdbBtreeZeroPage(pRoot, &zArg); ret = tdbBtreeInitPage(pRoot, &zArg, 0);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -483,7 +464,8 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
pgno = *(SPgno *)pCell; pgno = *(SPgno *)pCell;
} }
ret = tdbPagerFetchPage(pBt->pPager, pgno, pOlds + i, tdbBtreeInitPage, pBt, pTxn); ret = tdbPagerFetchPage(pBt->pPager, &pgno, pOlds + i, tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBt, .flags = 0}), pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -644,9 +626,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
if (iNew < nOlds) { if (iNew < nOlds) {
pNews[iNew] = pOlds[iNew]; pNews[iNew] = pOlds[iNew];
} else { } else {
pgno = 0;
iarg.pBt = pBt; iarg.pBt = pBt;
iarg.flags = flags; iarg.flags = flags;
ret = tdbPagerNewPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeZeroPage, &iarg, pTxn); ret = tdbPagerFetchPage(pBt->pPager, &pgno, pNews + iNew, tdbBtreeInitPage, &iarg, pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
} }
@ -674,13 +657,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
iarg.pBt = pBt; iarg.pBt = pBt;
iarg.flags = TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]); iarg.flags = TDB_BTREE_PAGE_GET_FLAGS(pOlds[0]);
for (int i = 0; i < nOlds; i++) { for (int i = 0; i < nOlds; i++) {
tdbPageCreate(pOlds[0]->pageSize, &pOldsCopy[i], NULL, NULL); tdbPageCreate(pOlds[0]->pageSize, &pOldsCopy[i], tdbDefaultMalloc, NULL);
tdbBtreeZeroPage(pOldsCopy[i], &iarg); tdbBtreeInitPage(pOldsCopy[i], &iarg, 0);
tdbPageCopy(pOlds[i], pOldsCopy[i]); tdbPageCopy(pOlds[i], pOldsCopy[i]);
} }
iNew = 0; iNew = 0;
nNewCells = 0; nNewCells = 0;
tdbBtreeZeroPage(pNews[iNew], &iarg); tdbBtreeInitPage(pNews[iNew], &iarg, 0);
for (int iOld = 0; iOld < nOlds; iOld++) { for (int iOld = 0; iOld < nOlds; iOld++) {
SPage *pPage; SPage *pPage;
@ -721,7 +704,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
iNew++; iNew++;
nNewCells = 0; nNewCells = 0;
if (iNew < nNews) { if (iNew < nNews) {
tdbBtreeZeroPage(pNews[iNew], &iarg); tdbBtreeInitPage(pNews[iNew], &iarg, 0);
} }
} }
} else { } else {
@ -740,7 +723,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
iNew++; iNew++;
nNewCells = 0; nNewCells = 0;
if (iNew < nNews) { if (iNew < nNews) {
tdbBtreeZeroPage(pNews[iNew], &iarg); tdbBtreeInitPage(pNews[iNew], &iarg, 0);
} }
} }
} }
@ -760,7 +743,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
} }
for (int i = 0; i < nOlds; i++) { for (int i = 0; i < nOlds; i++) {
tdbPageDestroy(pOldsCopy[i], NULL, NULL); tdbPageDestroy(pOldsCopy[i], tdbDefaultFree, NULL);
} }
} }
@ -1035,7 +1018,13 @@ int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn) {
pBtc->iPage = -1; pBtc->iPage = -1;
pBtc->pPage = NULL; pBtc->pPage = NULL;
pBtc->idx = -1; pBtc->idx = -1;
pBtc->pTxn = pTxn;
if (pTxn == NULL) {
pBtc->pTxn = &pBtc->txn;
tdbTxnOpen(pBtc->pTxn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
} else {
pBtc->pTxn = pTxn;
}
return 0; return 0;
} }
@ -1052,7 +1041,8 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move a clean cursor // move a clean cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt, pBtc->pTxn); ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBt, .flags = 0}), pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -1117,7 +1107,8 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move a clean cursor // move a clean cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt, pBtc->pTxn); ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBt, .flags = 0}), pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -1286,13 +1277,16 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno; pgno = ((SIntHdr *)pBtc->pPage->pData)->pgno;
} }
ASSERT(pgno);
pBtc->pgStack[pBtc->iPage] = pBtc->pPage; pBtc->pgStack[pBtc->iPage] = pBtc->pPage;
pBtc->idxStack[pBtc->iPage] = pBtc->idx; pBtc->idxStack[pBtc->iPage] = pBtc->idx;
pBtc->iPage++; pBtc->iPage++;
pBtc->pPage = NULL; pBtc->pPage = NULL;
pBtc->idx = -1; pBtc->idx = -1;
ret = tdbPagerFetchPage(pBtc->pBt->pPager, pgno, &pBtc->pPage, tdbBtreeInitPage, pBtc->pBt, pBtc->pTxn); ret = tdbPagerFetchPage(pBtc->pBt->pPager, &pgno, &pBtc->pPage, tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBtc->pBt, .flags = 0}), pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
@ -1327,7 +1321,8 @@ static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move from a clear cursor // move from a clear cursor
ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt, pBtc->pTxn); ret = tdbPagerFetchPage(pPager, &pBt->root, &(pBtc->pPage), tdbBtreeInitPage,
&((SBtreeInitPageArg){.pBt = pBt, .flags = TDB_BTREE_ROOT | TDB_BTREE_LEAF}), pBtc->pTxn);
if (ret < 0) { if (ret < 0) {
// TODO // TODO
ASSERT(0); ASSERT(0);

View File

@ -95,6 +95,8 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) { void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
i32 nRef; i32 nRef;
ASSERT(pTxn);
nRef = TDB_UNREF_PAGE(pPage); nRef = TDB_UNREF_PAGE(pPage);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
@ -108,8 +110,10 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
if (pPage->isLocal) { if (pPage->isLocal) {
tdbPCacheUnpinPage(pCache, pPage); tdbPCacheUnpinPage(pCache, pPage);
} else { } else {
// remove from hash if (TDB_TXN_IS_WRITE(pTxn)) {
tdbPCacheRemovePageFromHash(pCache, pPage); // remove from hash
tdbPCacheRemovePageFromHash(pCache, pPage);
}
// free the page // free the page
if (pTxn && pTxn->xFree) { if (pTxn && pTxn->xFree) {
@ -125,8 +129,11 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->pageSize; } int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->pageSize; }
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
int ret; int ret = 0;
SPage *pPage; SPage *pPage = NULL;
SPage *pPageH = NULL;
ASSERT(pTxn);
// 1. Search the hash table // 1. Search the hash table
pPage = pCache->pgHash[tdbPCachePageHash(pPgid) % pCache->nHash]; pPage = pCache->pgHash[tdbPCachePageHash(pPgid) % pCache->nHash];
@ -136,12 +143,17 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
} }
if (pPage) { if (pPage) {
// TODO: the page need to be copied and if (pPage->isLocal || TDB_TXN_IS_WRITE(pTxn)) {
// replaced the page in hash table tdbPCachePinPage(pCache, pPage);
tdbPCachePinPage(pCache, pPage); return pPage;
return pPage; }
} }
// 1. pPage == NULL
// 2. pPage && pPage->isLocal == 0 && !TDB_TXN_IS_WRITE(pTxn)
pPageH = pPage;
pPage = NULL;
// 2. Try to allocate a new page from the free list // 2. Try to allocate a new page from the free list
if (pCache->pFree) { if (pCache->pFree) {
pPage = pCache->pFree; pPage = pCache->pFree;
@ -158,7 +170,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
} }
// 4. Try a create new page // 4. Try a create new page
if (!pPage && pTxn && pTxn->xMalloc) { if (!pPage) {
ret = tdbPageCreate(pCache->pageSize, &pPage, pTxn->xMalloc, pTxn->xArg); ret = tdbPageCreate(pCache->pageSize, &pPage, pTxn->xMalloc, pTxn->xArg);
if (ret < 0) { if (ret < 0) {
// TODO // TODO
@ -176,12 +188,22 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
// or by recycling or allocated streesly, // or by recycling or allocated streesly,
// need to initialize it // need to initialize it
if (pPage) { if (pPage) {
memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid)); if (pPageH) {
pPage->pLruNext = NULL; // copy the page content
pPage->pPager = NULL; memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
pPage->pLruNext = NULL;
pPage->pPager = pPageH->pPager;
// TODO: allocated page may not add to hash memcpy(pPage->pData, pPageH->pData, pPage->pageSize);
tdbPCacheAddPageToHash(pCache, pPage); } else {
memcpy(&(pPage->pgid), pPgid, sizeof(*pPgid));
pPage->pLruNext = NULL;
pPage->pPager = NULL;
if (pPage->isLocal || TDB_TXN_IS_WRITE(pTxn)) {
tdbPCacheAddPageToHash(pCache, pPage);
}
}
} }
return pPage; return pPage;
@ -249,7 +271,7 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
pCache->nFree = 0; pCache->nFree = 0;
pCache->pFree = NULL; pCache->pFree = NULL;
for (int i = 0; i < pCache->cacheSize; i++) { for (int i = 0; i < pCache->cacheSize; i++) {
ret = tdbPageCreate(pCache->pageSize, &pPage, NULL, NULL); ret = tdbPageCreate(pCache->pageSize, &pPage, tdbDefaultMalloc, NULL);
if (ret < 0) { if (ret < 0) {
// TODO: handle error // TODO: handle error
return -1; return -1;

View File

@ -43,15 +43,14 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
u8 *ptr; u8 *ptr;
int size; int size;
ASSERT(xMalloc);
ASSERT(TDB_IS_PGSIZE_VLD(pageSize)); ASSERT(TDB_IS_PGSIZE_VLD(pageSize));
*ppPage = NULL; *ppPage = NULL;
size = pageSize + sizeof(*pPage); size = pageSize + sizeof(*pPage);
if (xMalloc == NULL) {
xMalloc = tdbDefaultMalloc;
}
ptr = (u8 *)((*xMalloc)(arg, size)); ptr = (u8 *)(xMalloc(arg, size));
if (ptr == NULL) { if (ptr == NULL) {
return -1; return -1;
} }
@ -75,12 +74,10 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) { int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) {
u8 *ptr; u8 *ptr;
if (!xFree) { ASSERT(xFree);
xFree = tdbDefaultFree;
}
ptr = pPage->pData; ptr = pPage->pData;
(*xFree)(arg, ptr); xFree(arg, ptr);
return 0; return 0;
} }
@ -436,7 +433,7 @@ static int tdbPageDefragment(SPage *pPage) {
/* ---------------------------------------------------------------------------------------------------------- */ /* ---------------------------------------------------------------------------------------------------------- */
#pragma pack(push,1) #pragma pack(push, 1)
typedef struct { typedef struct {
u16 cellNum; u16 cellNum;
u16 cellBody; u16 cellBody;
@ -520,7 +517,7 @@ SPageMethods pageMethods = {
setPageFreeCellInfo // setFreeCellInfo setPageFreeCellInfo // setFreeCellInfo
}; };
#pragma pack(push,1) #pragma pack(push, 1)
typedef struct { typedef struct {
u8 cellNum[3]; u8 cellNum[3];
u8 cellBody[3]; u8 cellBody[3];

View File

@ -15,7 +15,7 @@
#include "tdbInt.h" #include "tdbInt.h"
#pragma pack(push,1) #pragma pack(push, 1)
typedef struct { typedef struct {
u8 hdrString[16]; u8 hdrString[16];
u16 pageSize; u16 pageSize;
@ -29,7 +29,8 @@ TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct")
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL) #define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage); static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
u8 loadPage);
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage); static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
@ -228,13 +229,30 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
return 0; return 0;
} }
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg, int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
TXN *pTxn) { TXN *pTxn) {
SPage *pPage; SPage *pPage;
SPgid pgid; SPgid pgid;
int ret; int ret;
SPgno pgno;
u8 loadPage;
// Fetch a page container from the page cache pgno = *ppgno;
loadPage = 1;
// alloc new page
if (pgno == 0) {
loadPage = 0;
ret = tdbPagerAllocPage(pPager, &pgno);
if (ret < 0) {
ASSERT(0);
return -1;
}
}
ASSERT(pgno > 0);
// fetch a page container
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = pgno; pgid.pgno = pgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn); pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
@ -242,9 +260,9 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
return -1; return -1;
} }
// Initialize the page if need // init page if need
if (!TDB_PAGE_INITIALIZED(pPage)) { if (!TDB_PAGE_INITIALIZED(pPage)) {
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 1); ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -253,46 +271,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
ASSERT(TDB_PAGE_INITIALIZED(pPage)); ASSERT(TDB_PAGE_INITIALIZED(pPage));
ASSERT(pPage->pPager == pPager); ASSERT(pPage->pPager == pPager);
*ppPage = pPage; *ppgno = pgno;
return 0;
}
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
TXN *pTxn) {
int ret;
SPage *pPage;
SPgid pgid;
// Allocate a page number
ret = tdbPagerAllocPage(pPager, ppgno);
if (ret < 0) {
ASSERT(0);
return -1;
}
ASSERT(*ppgno != 0);
// Fetch a page container from the page cache
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = *ppgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
if (pPage == NULL) {
ASSERT(0);
return -1;
}
ASSERT(!TDB_PAGE_INITIALIZED(pPage));
// Initialize the page if need
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, 0);
if (ret < 0) {
ASSERT(0);
return -1;
}
ASSERT(TDB_PAGE_INITIALIZED(pPage));
ASSERT(pPage->pPager == pPager);
*ppPage = pPage; *ppPage = pPage;
return 0; return 0;
} }
@ -333,11 +312,14 @@ int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
return 0; return 0;
} }
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *), void *arg, u8 loadPage) { static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
int ret; u8 loadPage) {
int lcode; int ret;
int nLoops; int lcode;
i64 nRead; int nLoops;
i64 nRead;
SPgno pgno;
int init = 0;
lcode = TDB_TRY_LOCK_PAGE(pPage); lcode = TDB_TRY_LOCK_PAGE(pPage);
if (lcode == P_LOCK_SUCC) { if (lcode == P_LOCK_SUCC) {
@ -346,20 +328,21 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
return 0; return 0;
} }
if (loadPage) { pgno = TDB_PAGE_PGNO(pPage);
nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * TDB_PAGE_PGNO(pPage));
if (nRead < 0) { if (loadPage && pgno <= pPager->dbOrigSize) {
// TODO init = 1;
ASSERT(0);
return -1; nRead = tdbOsPRead(pPager->fd, pPage->pData, pPage->pageSize, ((i64)pPage->pageSize) * pgno);
} else if (nRead < pPage->pageSize) { if (nRead < pPage->pageSize) {
// TODO
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
} else {
init = 0;
} }
ret = (*initPage)(pPage, arg); ret = (*initPage)(pPage, arg, init);
if (ret < 0) { if (ret < 0) {
TDB_UNLOCK_PAGE(pPage); TDB_UNLOCK_PAGE(pPage);
return -1; return -1;

View File

@ -36,6 +36,7 @@ struct SBTC {
int idxStack[BTREE_MAX_DEPTH + 1]; int idxStack[BTREE_MAX_DEPTH + 1];
SPage *pgStack[BTREE_MAX_DEPTH + 1]; SPage *pgStack[BTREE_MAX_DEPTH + 1];
TXN *pTxn; TXN *pTxn;
TXN txn;
}; };
// SBTree // SBTree

View File

@ -42,10 +42,8 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate);
int tdbPagerWrite(SPager *pPager, SPage *pPage); int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager, TXN *pTxn); int tdbPagerBegin(SPager *pPager, TXN *pTxn);
int tdbPagerCommit(SPager *pPager, TXN *pTxn); int tdbPagerCommit(SPager *pPager, TXN *pTxn);
int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg, int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
TXN *pTxn); TXN *pTxn);
int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *), void *arg,
TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn); void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno); int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);