fix: flush pages to get buffer ready for fetching
This commit is contained in:
parent
9f9803dd64
commit
420be42269
|
@ -169,7 +169,7 @@ int tdbPCacheAlter(SPCache *pCache, int32_t nPage) {
|
|||
|
||||
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
|
||||
SPage *pPage;
|
||||
i32 nRef;
|
||||
i32 nRef = 0;
|
||||
|
||||
tdbPCacheLock(pCache);
|
||||
|
||||
|
@ -178,14 +178,17 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
|
|||
nRef = tdbRefPage(pPage);
|
||||
}
|
||||
|
||||
ASSERT(pPage);
|
||||
|
||||
tdbPCacheUnlock(pCache);
|
||||
|
||||
// printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
|
||||
// TDB_PAGE_PGNO(pPage), pPage, nRef);
|
||||
|
||||
if (pPage) {
|
||||
tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef);
|
||||
} else {
|
||||
tdbDebug("pcache/fetch page %p", pPage);
|
||||
}
|
||||
|
||||
return pPage;
|
||||
}
|
||||
|
||||
|
@ -266,7 +269,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
|
|||
}
|
||||
|
||||
// 4. Try a create new page
|
||||
if (!pPage) {
|
||||
if (!pPage && pTxn->xMalloc != NULL) {
|
||||
ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg);
|
||||
if (ret < 0 || pPage == NULL) {
|
||||
// TODO
|
||||
|
|
|
@ -27,6 +27,116 @@ typedef struct {
|
|||
|
||||
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");
|
||||
|
||||
struct hashset_st {
|
||||
size_t nbits;
|
||||
size_t mask;
|
||||
size_t capacity;
|
||||
size_t *items;
|
||||
size_t nitems;
|
||||
double load_factor;
|
||||
};
|
||||
|
||||
static const unsigned int prime = 39;
|
||||
static const unsigned int prime2 = 5009;
|
||||
|
||||
hashset_t hashset_create(void) {
|
||||
hashset_t set = tdbOsCalloc(1, sizeof(struct hashset_st));
|
||||
if (!set) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
set->nbits = 4;
|
||||
set->capacity = (size_t)(1 << set->nbits);
|
||||
set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
|
||||
if (!set->items) {
|
||||
tdbOsFree(set);
|
||||
return NULL;
|
||||
}
|
||||
set->mask = set->capacity - 1;
|
||||
set->nitems = 0;
|
||||
|
||||
set->load_factor = 0.75;
|
||||
|
||||
return set;
|
||||
}
|
||||
|
||||
void hashset_destroy(hashset_t set) {
|
||||
if (set) {
|
||||
tdbOsFree(set->items);
|
||||
tdbOsFree(set);
|
||||
}
|
||||
}
|
||||
|
||||
int hashset_add_member(hashset_t set, void *item) {
|
||||
size_t value = (size_t) item;
|
||||
size_t h;
|
||||
|
||||
if (value == 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
|
||||
if (set->items[h] == value) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
set->items[h] = value;
|
||||
++set->nitems;
|
||||
return 1;
|
||||
}
|
||||
|
||||
int hashset_add(hashset_t set, void *item) {
|
||||
int ret = hashset_add_member(set, item);
|
||||
|
||||
size_t old_capacity = set->capacity;
|
||||
if (set->nitems >= (double)old_capacity * set->load_factor) {
|
||||
size_t *old_items = set->items;
|
||||
++set->nbits;
|
||||
set->capacity = (size_t)(1 << set->nbits);
|
||||
set->mask = set->capacity - 1;
|
||||
|
||||
set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
|
||||
if (!set->items) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
set->nitems = 0;
|
||||
for (size_t i = 0; i < old_capacity; ++i) {
|
||||
hashset_add_member(set, (void*)old_items[i]);
|
||||
}
|
||||
tdbOsFree(old_items);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int hashset_remove(hashset_t set, void *item) {
|
||||
size_t value = (size_t) item;
|
||||
|
||||
for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
|
||||
if (set->items[h] == value) {
|
||||
set->items[h] = 0;
|
||||
--set->nitems;
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int hashset_contains(hashset_t set, void *item) {
|
||||
size_t value = (size_t) item;
|
||||
|
||||
for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
|
||||
if (set->items[h] == value) {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
|
||||
|
||||
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
|
||||
|
@ -209,12 +319,16 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
|
|||
tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||
|
||||
// Write page to journal if neccessary
|
||||
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
|
||||
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize && (pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))))) {
|
||||
ret = tdbPagerWritePageToJournal(pPager, pPage);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to write page to journal since %s", tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pPager->jPageSet) {
|
||||
hashset_add(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage)));
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -233,6 +347,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
pPager->jPageSet = hashset_create();
|
||||
// TODO: write the size of the file
|
||||
|
||||
pPager->inTran = 1;
|
||||
|
@ -275,6 +390,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
|||
pPage->isDirty = 0;
|
||||
|
||||
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||
if (pPager->jPageSet) {
|
||||
hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage)));
|
||||
}
|
||||
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||
}
|
||||
|
||||
|
@ -304,6 +422,9 @@ int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pPager->jPageSet) {
|
||||
hashset_destroy(pPager->jPageSet);
|
||||
}
|
||||
pPager->inTran = 0;
|
||||
|
||||
return 0;
|
||||
|
@ -375,36 +496,61 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
|
||||
if (jfd == NULL) {
|
||||
return -1;
|
||||
}
|
||||
tdb_fd_t jfd = pPager->jfd;
|
||||
|
||||
ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 1, read pages from jounal file
|
||||
// 2, write original pages to buffered ones
|
||||
u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
|
||||
if (pageBuf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* TODO: reset the buffered pages instead of releasing them
|
||||
// loop to reset the dirty pages from file
|
||||
for (pgIdx = 0, pPage = pPager->pDirty; pPage != NULL && pgIndex < journalSize; pPage = pPage->pDirtyNext, ++pgIdx) {
|
||||
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
|
||||
// read pgno & the page from journal
|
||||
SPgno pgno;
|
||||
|
||||
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
i64 offset = pPager->pageSize * (pgno - 1);
|
||||
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
|
||||
tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
|
||||
pPager->pageSize);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
if (tdbOsFSync(pPager->fd) < 0) {
|
||||
tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdbOsFree(pageBuf);
|
||||
|
||||
// 3, release the dirty pages
|
||||
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
|
||||
SRBTreeNode *pNode = NULL;
|
||||
|
@ -413,17 +559,55 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
|||
|
||||
pPage->isDirty = 0;
|
||||
|
||||
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||
hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage)));
|
||||
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||
}
|
||||
|
||||
tRBTreeCreate(&pPager->rbt, pageCmpFn);
|
||||
|
||||
// 4, remove the journal file
|
||||
tdbOsClose(pPager->jfd);
|
||||
(void)tdbOsRemove(pPager->jFileName);
|
||||
hashset_destroy(pPager->jPageSet);
|
||||
|
||||
pPager->inTran = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
|
||||
SPage *pPage;
|
||||
int ret;
|
||||
|
||||
// loop to write the dirty pages to file
|
||||
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
|
||||
SRBTreeNode *pNode = NULL;
|
||||
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
|
||||
pPage = (SPage *)pNode;
|
||||
ret = tdbPagerWritePageToDB(pPager, pPage);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to write page to db since %s", tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
|
||||
pPager->dbOrigSize = pPager->dbFileSize;
|
||||
|
||||
// release the page
|
||||
iter = tRBTreeIterCreate(&pPager->rbt, 1);
|
||||
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
|
||||
pPage = (SPage *)pNode;
|
||||
|
||||
pPage->isDirty = 0;
|
||||
|
||||
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||
}
|
||||
|
||||
tRBTreeCreate(&pPager->rbt, pageCmpFn);
|
||||
|
||||
// 4, remove the journal file
|
||||
tdbOsClose(pPager->jfd);
|
||||
(void)tdbOsRemove(pPager->jFileName);
|
||||
pPager->inTran = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -453,10 +637,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
|
|||
// fetch a page container
|
||||
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
|
||||
pgid.pgno = pgno;
|
||||
pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
|
||||
if (pPage == NULL) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn)) == NULL) {
|
||||
tdbPagerFlushPage(pPager, pTxn);
|
||||
}
|
||||
|
||||
tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
|
||||
|
|
|
@ -384,6 +384,8 @@ struct STDB {
|
|||
#endif
|
||||
};
|
||||
|
||||
typedef struct hashset_st *hashset_t;
|
||||
|
||||
struct SPager {
|
||||
char *dbFileName;
|
||||
char *jFileName;
|
||||
|
@ -394,7 +396,8 @@ struct SPager {
|
|||
SPCache *pCache;
|
||||
SPgno dbFileSize;
|
||||
SPgno dbOrigSize;
|
||||
SPage *pDirty;
|
||||
//SPage *pDirty;
|
||||
hashset_t jPageSet;
|
||||
SRBTree rbt;
|
||||
u8 inTran;
|
||||
SPager *pNext; // used by TDB
|
||||
|
|
Loading…
Reference in New Issue