fix(tdb/abort): invalidate flushed pages in cache not on dirty tree
This commit is contained in:
parent
6b8a603d27
commit
25e3f653ee
|
@ -145,7 +145,7 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr
|
||||||
pWriter->sver = sver;
|
pWriter->sver = sver;
|
||||||
pWriter->ever = ever;
|
pWriter->ever = ever;
|
||||||
|
|
||||||
metaBegin(pMeta, META_BEGIN_HEAP_OS);
|
metaBegin(pMeta, META_BEGIN_HEAP_NIL);
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -710,8 +710,8 @@ int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
||||||
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) {
|
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
metaDebug("vgId:%d, start to save ctime:%" PRId64 " uid:%" PRId64 " ct:%" PRId64, TD_VID(pMeta->pVnode), pME->version,
|
metaTrace("vgId:%d, start to save version:%" PRId64 " uid:%" PRId64 " ctime:%" PRId64, TD_VID(pMeta->pVnode),
|
||||||
pME->uid, ctimeKey.ctime);
|
pME->version, pME->uid, ctimeKey.ctime);
|
||||||
|
|
||||||
return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, pMeta->txn);
|
return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, pMeta->txn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -205,15 +205,34 @@ static void tdbPCacheFreePage(SPCache *pCache, SPage *pPage) {
|
||||||
pCache->pFree = pPage;
|
pCache->pFree = pPage;
|
||||||
pPage->isFree = 0;
|
pPage->isFree = 0;
|
||||||
++pCache->nFree;
|
++pCache->nFree;
|
||||||
tdbTrace("pcache/free page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
|
tdbTrace("pcache/free page %p/%d, pgno:%d, ", pPage, pPage->id, TDB_PAGE_PGNO(pPage));
|
||||||
} else {
|
} else {
|
||||||
tdbTrace("pcache destroy page: %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
|
tdbTrace("pcache/free2 page: %p/%d, pgno:%d, ", pPage, pPage->id, TDB_PAGE_PGNO(pPage));
|
||||||
|
|
||||||
tdbPCacheRemovePageFromHash(pCache, pPage);
|
tdbPCacheRemovePageFromHash(pCache, pPage);
|
||||||
tdbPageDestroy(pPage, tdbDefaultFree, NULL);
|
tdbPageDestroy(pPage, tdbDefaultFree, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tdbPCacheInvalidatePage(SPCache *pCache, SPager *pPager, SPgno pgno) {
|
||||||
|
SPgid pgid;
|
||||||
|
const SPgid *pPgid = &pgid;
|
||||||
|
SPage *pPage = NULL;
|
||||||
|
|
||||||
|
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
|
||||||
|
pgid.pgno = pgno;
|
||||||
|
|
||||||
|
pPage = pCache->pgHash[tdbPCachePageHash(pPgid) % pCache->nHash];
|
||||||
|
while (pPage) {
|
||||||
|
if (pPage->pgid.pgno == pPgid->pgno && memcmp(pPage->pgid.fileid, pPgid->fileid, TDB_FILE_ID_LEN) == 0) break;
|
||||||
|
pPage = pPage->pHashNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pPage) {
|
||||||
|
tdbPCacheRemovePageFromHash(pCache, pPage);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
|
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
|
||||||
i32 nRef;
|
i32 nRef;
|
||||||
|
|
||||||
|
@ -359,7 +378,7 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
|
||||||
|
|
||||||
pCache->nRecyclable--;
|
pCache->nRecyclable--;
|
||||||
|
|
||||||
tdbTrace("pcache/pin page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
|
tdbTrace("pcache/pin page %p/%d, pgno:%d, ", pPage, pPage->id, TDB_PAGE_PGNO(pPage));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,7 +391,8 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
|
||||||
|
|
||||||
ASSERT(pPage->pLruNext == NULL);
|
ASSERT(pPage->pLruNext == NULL);
|
||||||
|
|
||||||
tdbTrace("pCache:%p unpin page %p/%d/%d, nPages:%d", pCache, pPage, TDB_PAGE_PGNO(pPage), pPage->id, pCache->nPages);
|
tdbTrace("pCache:%p unpin page %p/%d, nPages:%d, pgno:%d, ", pCache, pPage, pPage->id, pCache->nPages,
|
||||||
|
TDB_PAGE_PGNO(pPage));
|
||||||
if (pPage->id < pCache->nPages) {
|
if (pPage->id < pCache->nPages) {
|
||||||
pPage->pLruPrev = &(pCache->lru);
|
pPage->pLruPrev = &(pCache->lru);
|
||||||
pPage->pLruNext = pCache->lru.pLruNext;
|
pPage->pLruNext = pCache->lru.pLruNext;
|
||||||
|
@ -404,7 +424,7 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
|
||||||
// printf("rmv page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
|
// printf("rmv page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbTrace("pcache/remove page %p/%d/%d from hash %" PRIu32, pPage, TDB_PAGE_PGNO(pPage), pPage->id, h);
|
tdbTrace("pcache/remove page %p/%d from hash %" PRIu32 " pgno:%d, ", pPage, pPage->id, h, TDB_PAGE_PGNO(pPage));
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
|
static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
|
||||||
|
@ -415,7 +435,7 @@ static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
|
||||||
|
|
||||||
pCache->nPage++;
|
pCache->nPage++;
|
||||||
|
|
||||||
tdbTrace("pcache/add page %p/%d/%d to hash %" PRIu32, pPage, TDB_PAGE_PGNO(pPage), pPage->id, h);
|
tdbTrace("pcache/add page %p/%d to hash %" PRIu32 " pgno:%d, ", pPage, pPage->id, h, TDB_PAGE_PGNO(pPage));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tdbPCacheOpenImpl(SPCache *pCache) {
|
static int tdbPCacheOpenImpl(SPCache *pCache) {
|
||||||
|
|
|
@ -299,6 +299,9 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
|
||||||
pTxn->jPageSet = hashset_create();
|
pTxn->jPageSet = hashset_create();
|
||||||
|
|
||||||
pPager->pActiveTxn = pTxn;
|
pPager->pActiveTxn = pTxn;
|
||||||
|
|
||||||
|
tdbDebug("pager/begin: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
|
||||||
|
|
||||||
// TODO: write the size of the file
|
// TODO: write the size of the file
|
||||||
/*
|
/*
|
||||||
pPager->inTran = 1;
|
pPager->inTran = 1;
|
||||||
|
@ -332,7 +335,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
|
tdbDebug("pager/commit: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
|
||||||
|
|
||||||
pPager->dbOrigSize = pPager->dbFileSize;
|
pPager->dbOrigSize = pPager->dbFileSize;
|
||||||
|
|
||||||
// release the page
|
// release the page
|
||||||
|
@ -381,6 +385,8 @@ int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
|
||||||
|
|
||||||
// pPager->inTran = 0;
|
// pPager->inTran = 0;
|
||||||
|
|
||||||
|
tdbDebug("pager/post-commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,7 +483,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbDebug("tdb/abort: pager:%p,", pPager);
|
tdbDebug("pager/abort: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
|
||||||
|
|
||||||
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
|
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
|
||||||
// read pgno & the page from journal
|
// read pgno & the page from journal
|
||||||
|
@ -489,7 +495,9 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbTrace("tdb/abort: pgno:%d,", pgno);
|
tdbTrace("pager/abort: restore pgno:%d,", pgno);
|
||||||
|
|
||||||
|
tdbPCacheInvalidatePage(pPager->pCache, pPager, pgno);
|
||||||
|
|
||||||
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
|
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
@ -529,6 +537,9 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
||||||
SRBTreeNode *pNode = NULL;
|
SRBTreeNode *pNode = NULL;
|
||||||
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
|
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
|
||||||
pPage = (SPage *)pNode;
|
pPage = (SPage *)pNode;
|
||||||
|
SPgno pgno = TDB_PAGE_PGNO(pPage);
|
||||||
|
|
||||||
|
tdbTrace("pager/abort: drop dirty pgno:%d,", pgno);
|
||||||
|
|
||||||
pPage->isDirty = 0;
|
pPage->isDirty = 0;
|
||||||
|
|
||||||
|
@ -538,7 +549,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
||||||
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdbTrace("reset dirty tree: %p", &pPager->rbt);
|
tdbTrace("pager/abort: reset dirty tree: %p", &pPager->rbt);
|
||||||
tRBTreeCreate(&pPager->rbt, pageCmpFn);
|
tRBTreeCreate(&pPager->rbt, pageCmpFn);
|
||||||
|
|
||||||
// 4, remove the journal file
|
// 4, remove the journal file
|
||||||
|
@ -599,6 +610,9 @@ int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tdbDebug("pager/flush: %p, %d/%d, txnId:%" PRId64, pPager, pPager->dbOrigSize, pPager->dbFileSize, pTxn->txnId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
|
tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
|
||||||
pPager->dbOrigSize = maxPgno;
|
pPager->dbOrigSize = maxPgno;
|
||||||
|
|
|
@ -224,6 +224,7 @@ int tdbPCacheAlter(SPCache *pCache, int32_t nPage);
|
||||||
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
|
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
|
||||||
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn);
|
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn);
|
||||||
void tdbPCacheMarkFree(SPCache *pCache, SPage *pPage);
|
void tdbPCacheMarkFree(SPCache *pCache, SPage *pPage);
|
||||||
|
void tdbPCacheInvalidatePage(SPCache *pCache, SPager *pPager, SPgno pgno);
|
||||||
int tdbPCacheGetPageSize(SPCache *pCache);
|
int tdbPCacheGetPageSize(SPCache *pCache);
|
||||||
|
|
||||||
// tdbPage.c ====================================
|
// tdbPage.c ====================================
|
||||||
|
|
Loading…
Reference in New Issue