fix: dirty new allocated non-dirty page (root leaf without writings)

This commit is contained in:
Minglei Jin 2022-07-05 14:28:10 +08:00
parent 6af95b3b65
commit 06af04675a
3 changed files with 268 additions and 251 deletions

View File

@ -40,8 +40,8 @@ struct SBTree {
#define TDB_BTREE_PAGE_IS_ROOT(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT) #define TDB_BTREE_PAGE_IS_ROOT(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT)
#define TDB_BTREE_PAGE_IS_LEAF(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF) #define TDB_BTREE_PAGE_IS_LEAF(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF)
#define TDB_BTREE_PAGE_IS_OVFL(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_OVFL) #define TDB_BTREE_PAGE_IS_OVFL(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_OVFL)
#define TDB_BTREE_ASSERT_FLAG(flags) \ #define TDB_BTREE_ASSERT_FLAG(flags) \
ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \ ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \
TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0) || \ TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0) || \
TDB_FLAG_IS(flags, TDB_BTREE_OVFL)) TDB_FLAG_IS(flags, TDB_BTREE_OVFL))
@ -58,7 +58,7 @@ typedef struct {
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2); static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2);
static int tdbBtreeOpenImpl(SBTree *pBt); static int tdbBtreeOpenImpl(SBTree *pBt);
//static int tdbBtreeInitPage(SPage *pPage, void *arg, int init); // static int tdbBtreeInitPage(SPage *pPage, void *arg, int init);
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell, static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
int *szCell, TXN *pTxn, SBTree *pBt); int *szCell, TXN *pTxn, SBTree *pBt);
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt); static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt);
@ -321,7 +321,7 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
{ {
// 1. TODO: Search the main DB to check if the DB exists // 1. TODO: Search the main DB to check if the DB exists
ret = tdbPagerOpenDB(pBt->pPager, &pgno, true); ret = tdbPagerOpenDB(pBt->pPager, &pgno, true, pBt);
ASSERT(ret == 0); ASSERT(ret == 0);
} }
@ -721,7 +721,8 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
int szNewCell; int szNewCell;
SPgno pgno; SPgno pgno;
pgno = TDB_PAGE_PGNO(pNews[iNew]); pgno = TDB_PAGE_PGNO(pNews[iNew]);
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell, pTxn, pBt); tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell, pTxn,
pBt);
tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0); tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
tdbOsFree(pNewCell); tdbOsFree(pNewCell);
} }
@ -916,10 +917,10 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno)); int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
int nLocal = surplus <= maxLocal ? surplus : minLocal; int nLocal = surplus <= maxLocal ? surplus : minLocal;
//int ofpCap = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr)); // int ofpCap = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr));
// fetch a new ofp and make it dirty // fetch a new ofp and make it dirty
SPgno pgno = 0; SPgno pgno = 0;
SPage *ofp, *nextOfp; SPage *ofp, *nextOfp;
ret = tdbFetchOvflPage(&pgno, &ofp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &ofp, pTxn, pBt);
@ -942,8 +943,8 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
nLeft -= kLen; nLeft -= kLen;
// pack partial val to local if any space left // pack partial val to local if any space left
if (nLocal > kLen + 4) { if (nLocal > kLen + 4) {
memcpy(pCell + nHeader + kLen, pVal, nLocal - kLen - sizeof(SPgno)); memcpy(pCell + nHeader + kLen, pVal, nLocal - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno); nLeft -= nLocal - kLen - sizeof(SPgno);
} }
// pack nextPgno // pack nextPgno
@ -951,132 +952,132 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
// pack left val data to ovpages // pack left val data to ovpages
do { do {
lastPage = 0; lastPage = 0;
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft; bytes = nLeft;
lastPage = 1; lastPage = 1;
} else { } else {
bytes = ofp->maxLocal - sizeof(SPgno); bytes = ofp->maxLocal - sizeof(SPgno);
} }
// fetch next ofp if not last page // fetch next ofp if not last page
if (!lastPage) { if (!lastPage) {
// fetch a new ofp and make it dirty // fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
tdbFree(pBuf); tdbFree(pBuf);
return -1; return -1;
} }
} else { } else {
pgno = 0; pgno = 0;
} }
memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes); memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
memcpy(pBuf + bytes, &pgno, sizeof(pgno)); memcpy(pBuf + bytes, &pgno, sizeof(pgno));
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0); ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) { if (ret < 0) {
tdbFree(pBuf); tdbFree(pBuf);
return -1; return -1;
} }
ofp = nextOfp; ofp = nextOfp;
nLeft -= bytes; nLeft -= bytes;
} while (nLeft > 0); } while (nLeft > 0);
} else { } else {
int nLeftKey = kLen; int nLeftKey = kLen;
// pack partial key and nextPgno // pack partial key and nextPgno
memcpy(pCell + nHeader, pKey, nLocal - 4); memcpy(pCell + nHeader, pKey, nLocal - 4);
nLeft -= nLocal - 4; nLeft -= nLocal - 4;
nLeftKey -= nLocal -4; nLeftKey -= nLocal - 4;
memcpy(pCell + nHeader + nLocal - 4, &pgno, sizeof(pgno)); memcpy(pCell + nHeader + nLocal - 4, &pgno, sizeof(pgno));
int lastKeyPageSpace = 0; int lastKeyPageSpace = 0;
// pack left key & val to ovpages // pack left key & val to ovpages
do { do {
// cal key to cpy // cal key to cpy
int lastKeyPage = 0; int lastKeyPage = 0;
if (nLeftKey <= ofp->maxLocal - sizeof(SPgno)) { if (nLeftKey <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeftKey; bytes = nLeftKey;
lastKeyPage = 1; lastKeyPage = 1;
lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey; lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
} else { } else {
bytes = ofp->maxLocal - sizeof(SPgno); bytes = ofp->maxLocal - sizeof(SPgno);
} }
// cpy key // cpy key
memcpy(pBuf, ((SCell *)pKey) + kLen - nLeftKey, bytes); memcpy(pBuf, ((SCell *)pKey) + kLen - nLeftKey, bytes);
if (lastKeyPage) { if (lastKeyPage) {
if (lastKeyPageSpace >= vLen) { if (lastKeyPageSpace >= vLen) {
memcpy(pBuf + kLen -nLeftKey, pVal, vLen); memcpy(pBuf + kLen - nLeftKey, pVal, vLen);
nLeft -= vLen; nLeft -= vLen;
pgno = 0; pgno = 0;
} else { } else {
memcpy(pBuf + kLen -nLeftKey, pVal, lastKeyPageSpace); memcpy(pBuf + kLen - nLeftKey, pVal, lastKeyPageSpace);
nLeft -= lastKeyPageSpace; nLeft -= lastKeyPageSpace;
// fetch next ofp, a new ofp and make it dirty // fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
} }
} else { } else {
// fetch next ofp, a new ofp and make it dirty // fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
} }
memcpy(pBuf + kLen - nLeft, &pgno, sizeof(pgno)); memcpy(pBuf + kLen - nLeft, &pgno, sizeof(pgno));
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0); ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
ofp = nextOfp; ofp = nextOfp;
nLeftKey -= bytes; nLeftKey -= bytes;
nLeft -= bytes; nLeft -= bytes;
} while (nLeftKey > 0); } while (nLeftKey > 0);
while (nLeft > 0) { while (nLeft > 0) {
// pack left val data to ovpages // pack left val data to ovpages
lastPage = 0; lastPage = 0;
if (nLeft <= maxLocal - sizeof(SPgno)) { if (nLeft <= maxLocal - sizeof(SPgno)) {
bytes = nLeft; bytes = nLeft;
lastPage = 1; lastPage = 1;
} else { } else {
bytes = maxLocal - sizeof(SPgno); bytes = maxLocal - sizeof(SPgno);
} }
// fetch next ofp if not last page // fetch next ofp if not last page
if (!lastPage) { if (!lastPage) {
// fetch a new ofp and make it dirty // fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt); ret = tdbFetchOvflPage(&pgno, &nextOfp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
tdbFree(pBuf); tdbFree(pBuf);
return -1; return -1;
} }
} else { } else {
pgno = 0; pgno = 0;
} }
memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes); memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
memcpy(pBuf + bytes, &pgno, sizeof(pgno)); memcpy(pBuf + bytes, &pgno, sizeof(pgno));
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0); ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) { if (ret < 0) {
tdbFree(pBuf); tdbFree(pBuf);
return -1; return -1;
} }
ofp = nextOfp; ofp = nextOfp;
nLeft -= bytes; nLeft -= bytes;
} }
} }
@ -1142,7 +1143,8 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
return 0; return 0;
} }
static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt) { static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder, TXN *pTxn,
SBTree *pBt) {
int ret = 0; int ret = 0;
int nPayload; int nPayload;
int maxLocal = pPage->maxLocal; int maxLocal = pPage->maxLocal;
@ -1171,149 +1173,149 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno)); int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
int nLocal = surplus <= maxLocal ? surplus : minLocal; int nLocal = surplus <= maxLocal ? surplus : minLocal;
int nLeft = nPayload; int nLeft = nPayload;
SPgno pgno = 0; SPgno pgno = 0;
SPage *ofp; SPage *ofp;
SCell *ofpCell; SCell *ofpCell;
int bytes; int bytes;
int lastPage = 0; int lastPage = 0;
if (nLocal >= pDecoder->kLen + 4) { if (nLocal >= pDecoder->kLen + 4) {
pDecoder->pKey = (SCell *)pCell + nHeader; pDecoder->pKey = (SCell *)pCell + nHeader;
nLeft -= kLen; nLeft -= kLen;
if (nLocal > kLen + 4) { if (nLocal > kLen + 4) {
// read partial val to local // read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen); pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) { if (pDecoder->pVal == NULL) {
return -1; return -1;
} }
TDB_CELLDECODER_SET_FREE_VAL(pDecoder); TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno)); memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno); nLeft -= nLocal - kLen - sizeof(SPgno);
} }
memcpy(&pgno, pCell + nHeader + nPayload - nLeft, sizeof(pgno)); memcpy(&pgno, pCell + nHeader + nPayload - nLeft, sizeof(pgno));
// unpack left val data from ovpages // unpack left val data from ovpages
while (pgno != 0) { while (pgno != 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt); ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
ofpCell = tdbPageGetCell(ofp, 0); ofpCell = tdbPageGetCell(ofp, 0);
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft; bytes = nLeft;
lastPage = 1; lastPage = 1;
} else { } else {
bytes = ofp->maxLocal - sizeof(SPgno); bytes = ofp->maxLocal - sizeof(SPgno);
} }
memcpy(pDecoder->pVal + vLen - nLeft, ofpCell, bytes); memcpy(pDecoder->pVal + vLen - nLeft, ofpCell, bytes);
nLeft -= bytes; nLeft -= bytes;
memcpy(&pgno, ofpCell + bytes, sizeof(pgno)); memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
} }
} else { } else {
int nLeftKey = kLen; int nLeftKey = kLen;
// load partial key and nextPgno // load partial key and nextPgno
pDecoder->pKey = tdbRealloc(pDecoder->pKey, kLen); pDecoder->pKey = tdbRealloc(pDecoder->pKey, kLen);
if (pDecoder->pKey == NULL) { if (pDecoder->pKey == NULL) {
return -1; return -1;
} }
TDB_CELLDECODER_SET_FREE_KEY(pDecoder); TDB_CELLDECODER_SET_FREE_KEY(pDecoder);
memcpy(pDecoder->pKey, pCell + nHeader, nLocal - 4); memcpy(pDecoder->pKey, pCell + nHeader, nLocal - 4);
nLeft -= nLocal - 4; nLeft -= nLocal - 4;
nLeftKey -= nLocal -4; nLeftKey -= nLocal - 4;
memcpy(&pgno, pCell + nHeader + nLocal - 4, sizeof(pgno)); memcpy(&pgno, pCell + nHeader + nLocal - 4, sizeof(pgno));
int lastKeyPageSpace = 0; int lastKeyPageSpace = 0;
// load left key & val to ovpages // load left key & val to ovpages
while (pgno != 0) { while (pgno != 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt); ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
ofpCell = tdbPageGetCell(ofp, 0); ofpCell = tdbPageGetCell(ofp, 0);
int lastKeyPage = 0; int lastKeyPage = 0;
if (nLeftKey <= maxLocal - sizeof(SPgno)) { if (nLeftKey <= maxLocal - sizeof(SPgno)) {
bytes = nLeftKey; bytes = nLeftKey;
lastKeyPage = 1; lastKeyPage = 1;
lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey; lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
} else { } else {
bytes = ofp->maxLocal - sizeof(SPgno); bytes = ofp->maxLocal - sizeof(SPgno);
} }
// cpy key // cpy key
memcpy(pDecoder->pKey + kLen - nLeftKey, ofpCell, bytes); memcpy(pDecoder->pKey + kLen - nLeftKey, ofpCell, bytes);
if (lastKeyPage) { if (lastKeyPage) {
if (lastKeyPageSpace >= vLen) { if (lastKeyPageSpace >= vLen) {
pDecoder->pVal = ofpCell + kLen -nLeftKey; pDecoder->pVal = ofpCell + kLen - nLeftKey;
nLeft -= vLen; nLeft -= vLen;
pgno = 0; pgno = 0;
} else { } else {
// read partial val to local // read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen); pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) { if (pDecoder->pVal == NULL) {
return -1; return -1;
} }
TDB_CELLDECODER_SET_FREE_VAL(pDecoder); TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
memcpy(pDecoder->pVal, ofpCell + kLen -nLeftKey, lastKeyPageSpace); memcpy(pDecoder->pVal, ofpCell + kLen - nLeftKey, lastKeyPageSpace);
nLeft -= lastKeyPageSpace; nLeft -= lastKeyPageSpace;
} }
} }
memcpy(&pgno, ofpCell + bytes, sizeof(pgno)); memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
nLeftKey -= bytes; nLeftKey -= bytes;
nLeft -= bytes; nLeft -= bytes;
} }
while (nLeft > 0) { while (nLeft > 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt); ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
ofpCell = tdbPageGetCell(ofp, 0); ofpCell = tdbPageGetCell(ofp, 0);
// load left val data to ovpages // load left val data to ovpages
lastPage = 0; lastPage = 0;
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft; bytes = nLeft;
lastPage = 1; lastPage = 1;
} else { } else {
bytes = ofp->maxLocal - sizeof(SPgno); bytes = ofp->maxLocal - sizeof(SPgno);
} }
if (lastPage) { if (lastPage) {
pgno = 0; pgno = 0;
} }
if (!pDecoder->pVal) { if (!pDecoder->pVal) {
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen); pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) { if (pDecoder->pVal == NULL) {
return -1; return -1;
} }
TDB_CELLDECODER_SET_FREE_VAL(pDecoder); TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
} }
memcpy(pDecoder->pVal, ofpCell + vLen - nLeft, bytes); memcpy(pDecoder->pVal, ofpCell + vLen - nLeft, bytes);
nLeft -= bytes; nLeft -= bytes;
memcpy(&pgno, ofpCell + vLen - nLeft + bytes, sizeof(pgno)); memcpy(&pgno, ofpCell + vLen - nLeft + bytes, sizeof(pgno));
nLeft -= bytes; nLeft -= bytes;
} }
} }
} }
@ -1404,31 +1406,31 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
// free ofp pages' cells // free ofp pages' cells
if (dropOfp) { if (dropOfp) {
int ret = 0; int ret = 0;
SPgno pgno = *(SPgno *) (pCell + nHeader + nLocal - sizeof(SPgno)); SPgno pgno = *(SPgno *)(pCell + nHeader + nLocal - sizeof(SPgno));
int nLeft = nPayload - nLocal + sizeof(SPgno); int nLeft = nPayload - nLocal + sizeof(SPgno);
SPage *ofp; SPage *ofp;
int bytes; int bytes;
while (pgno != 0) { while (pgno != 0) {
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt); ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
SCell *ofpCell = tdbPageGetCell(ofp, 0); SCell *ofpCell = tdbPageGetCell(ofp, 0);
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) { if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft; bytes = nLeft;
} else { } else {
bytes = ofp->maxLocal - sizeof(SPgno); bytes = ofp->maxLocal - sizeof(SPgno);
} }
memcpy(&pgno, ofpCell + bytes, sizeof(pgno)); memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
tdbPagerReturnPage(pPage->pPager, ofp, pTxn); tdbPagerReturnPage(pPage->pPager, ofp, pTxn);
nLeft -= bytes; nLeft -= bytes;
} }
} }
@ -1932,7 +1934,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
// alloc space // alloc space
szBuf = kLen + nData + 14; szBuf = kLen + nData + 14;
pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize); pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
if (pBuf == NULL) { if (pBuf == NULL) {
ASSERT(0); ASSERT(0);
return -1; return -1;

View File

@ -98,7 +98,7 @@ int tdbPagerClose(SPager *pPager) {
return 0; return 0;
} }
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) { int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) {
SPgno pgno; SPgno pgno;
SPage *pPage; SPage *pPage;
int ret; int ret;
@ -110,25 +110,41 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
} }
{ {
// TODO: try to search the main DB to get the page number // TODO: try to search the main DB to get the page number
// pgno = 0; // pgno = 0;
} }
// if (pgno == 0 && toCreate) { if (pgno == 0 && toCreate) {
// ret = tdbPagerAllocPage(pPager, &pPage, &pgno); // allocate a new child page
// if (ret < 0) { TXN txn;
// return -1; tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
// }
// // TODO: Need to zero the page pPager->inTran = 1;
// ret = tdbPagerWrite(pPager, pPage); SBtreeInitPageArg zArg;
// if (ret < 0) { zArg.flags = 0x1 | 0x2; // root leaf node;
// return -1; zArg.pBt = pBt;
// } ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn);
// } if (ret < 0) {
return -1;
}
*ppgno = pgno; // ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
// if (ret < 0) {
// return -1;
//}
// TODO: Need to zero the page
ret = tdbPagerWrite(pPager, pPage);
if (ret < 0) {
return -1;
}
tdbTxnClose(&txn);
}
*ppgno = pgno;
return 0; return 0;
} }
@ -427,9 +443,9 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
} }
int tdbPagerRestore(SPager *pPager, SBTree *pBt) { int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
int ret = 0; int ret = 0;
SPgno journalSize = 0; SPgno journalSize = 0;
u8 *pageBuf = NULL; u8 *pageBuf = NULL;
tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755); tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
if (jfd == NULL) { if (jfd == NULL) {
@ -454,7 +470,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) { for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
// read pgno & the page from journal // read pgno & the page from journal
SPgno pgno; SPgno pgno;
SPage *pPage; SPage *pPage;
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno)); int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));

View File

@ -128,13 +128,13 @@ typedef struct SBtInfo {
#define TDB_CELLDECODER_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_VAL) #define TDB_CELLDECODER_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_VAL)
typedef struct { typedef struct {
int kLen; int kLen;
u8 *pKey; u8 *pKey;
int vLen; int vLen;
u8 *pVal; u8 *pVal;
SPgno pgno; SPgno pgno;
u8 *pBuf; u8 *pBuf;
u8 freeKV; u8 freeKV;
} SCellDecoder; } SCellDecoder;
struct SBTC { struct SBTC {
@ -184,7 +184,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager); int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int tdbPagerClose(SPager *pPager); int tdbPagerClose(SPager *pPager);
int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate); int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt);
int tdbPagerWrite(SPager *pPager, SPage *pPage); int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager, TXN *pTxn); int tdbPagerBegin(SPager *pPager, TXN *pTxn);
int tdbPagerCommit(SPager *pPager, TXN *pTxn); int tdbPagerCommit(SPager *pPager, TXN *pTxn);
@ -192,7 +192,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initP
TXN *pTxn); TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn); void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno); int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
int tdbPagerRestore(SPager *pPager, SBTree *pBt); int tdbPagerRestore(SPager *pPager, SBTree *pBt);
// tdbPCache.c ==================================== // tdbPCache.c ====================================
#define TDB_PCACHE_PAGE \ #define TDB_PCACHE_PAGE \
@ -314,19 +314,18 @@ static inline int tdbTryLockPage(tdb_spinlock_t *pLock) {
#define TDB_TRY_LOCK_PAGE(pPage) tdbTryLockPage(&((pPage)->lock)) #define TDB_TRY_LOCK_PAGE(pPage) tdbTryLockPage(&((pPage)->lock))
// APIs // APIs
#define TDB_PAGE_TOTAL_CELLS(pPage) ((pPage)->nOverflow + (pPage)->pPageMethods->getCellNum(pPage)) #define TDB_PAGE_TOTAL_CELLS(pPage) ((pPage)->nOverflow + (pPage)->pPageMethods->getCellNum(pPage))
#define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx) #define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx)
#define TDB_PAGE_FREE_SIZE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage) #define TDB_PAGE_FREE_SIZE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno) #define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell, 0, NULL, NULL) + (pPage)->pPageMethods->szOffset) #define TDB_BYTES_CELL_TAKEN(pPage, pCell) \
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset) ((*(pPage)->xCellSize)(pPage, pCell, 0, NULL, NULL) + (pPage)->pPageMethods->szOffset)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg); int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg);
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg); int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt));
TXN *, SBTree *pBt)); void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt));
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt));
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl); int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl);
int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt); int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt);
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt); int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt);