Merge pull request #19010 from taosdata/fix/TD-21344-main
fix(tdb): rollback in-memory pages
This commit is contained in:
commit
f316cc12bf
|
@ -161,7 +161,10 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
|
|||
SMetaSnapWriter* pWriter = *ppWriter;
|
||||
|
||||
if (rollback) {
|
||||
metaInfo("vgId:%d, meta snapshot writer close and rollback start ", TD_VID(pWriter->pMeta->pVnode));
|
||||
code = metaAbort(pWriter->pMeta);
|
||||
metaInfo("vgId:%d, meta snapshot writer close and rollback finished, code:0x%x", TD_VID(pWriter->pMeta->pVnode),
|
||||
code);
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
|
||||
|
|
|
@ -710,6 +710,9 @@ int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
|
|||
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) {
|
||||
return 0;
|
||||
}
|
||||
metaDebug("vgId:%d, start to save ctime:%" PRId64 " uid:%" PRId64 " ct:%" PRId64, TD_VID(pMeta->pVnode), pME->version,
|
||||
pME->uid, ctimeKey.ctime);
|
||||
|
||||
return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, pMeta->txn);
|
||||
}
|
||||
|
||||
|
|
|
@ -192,6 +192,28 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
|
|||
return pPage;
|
||||
}
|
||||
|
||||
void tdbPCacheMarkFree(SPCache *pCache, SPage *pPage) {
|
||||
tdbPCacheLock(pCache);
|
||||
tdbPCacheRemovePageFromHash(pCache, pPage);
|
||||
pPage->isFree = 1;
|
||||
tdbPCacheUnlock(pCache);
|
||||
}
|
||||
|
||||
static void tdbPCacheFreePage(SPCache *pCache, SPage *pPage) {
|
||||
if (pPage->id < pCache->nPages) {
|
||||
pPage->pFreeNext = pCache->pFree;
|
||||
pCache->pFree = pPage;
|
||||
pPage->isFree = 0;
|
||||
++pCache->nFree;
|
||||
tdbTrace("pcache/free page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
|
||||
} else {
|
||||
tdbTrace("pcache destroy page: %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
|
||||
|
||||
tdbPCacheRemovePageFromHash(pCache, pPage);
|
||||
tdbPageDestroy(pPage, tdbDefaultFree, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
|
||||
i32 nRef;
|
||||
|
||||
|
@ -209,7 +231,11 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
|
|||
// nRef = tdbGetPageRef(pPage);
|
||||
// if (nRef == 0) {
|
||||
if (pPage->isLocal) {
|
||||
tdbPCacheUnpinPage(pCache, pPage);
|
||||
if (!pPage->isFree) {
|
||||
tdbPCacheUnpinPage(pCache, pPage);
|
||||
} else {
|
||||
tdbPCacheFreePage(pCache, pPage);
|
||||
}
|
||||
} else {
|
||||
if (TDB_TXN_IS_WRITE(pTxn)) {
|
||||
// remove from hash
|
||||
|
|
|
@ -111,6 +111,9 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
|
|||
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int, TXN *, SBTree *pBt)) {
|
||||
tdbTrace("page/init: %p %" PRIu8 " %p", pPage, szAmHdr, xCellSize);
|
||||
pPage->pPageHdr = pPage->pData + szAmHdr;
|
||||
if (TDB_PAGE_NCELLS(pPage) == 0) {
|
||||
return tdbPageZero(pPage, szAmHdr, xCellSize);
|
||||
}
|
||||
pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage);
|
||||
pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage);
|
||||
pPage->pFreeEnd = pPage->pData + TDB_PAGE_CCELLS(pPage);
|
||||
|
|
|
@ -466,11 +466,19 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (tdbOsLSeek(jfd, 0L, SEEK_SET) < 0) {
|
||||
tdbError("failed to lseek jfd due to %s. file:%s, offset:0", strerror(errno), pPager->dbFileName);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
|
||||
if (pageBuf == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdbDebug("tdb/abort: pager:%p,", pPager);
|
||||
|
||||
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
|
||||
// read pgno & the page from journal
|
||||
SPgno pgno;
|
||||
|
@ -481,6 +489,8 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdbTrace("tdb/abort: pgno:%d,", pgno);
|
||||
|
||||
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pageBuf);
|
||||
|
@ -524,6 +534,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
|||
|
||||
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||
hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
|
||||
tdbPCacheMarkFree(pPager->pCache, pPage);
|
||||
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||
}
|
||||
|
||||
|
@ -577,12 +588,12 @@ int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdbTrace("tdb/flush:%p, %d/%d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
|
||||
tdbTrace("tdb/flush:%p, pgno:%d, %d/%d/%d", pPager, pgno, pPager->dbOrigSize, pPager->dbFileSize, maxPgno);
|
||||
pPager->dbOrigSize = maxPgno;
|
||||
|
||||
pPage->isDirty = 0;
|
||||
|
||||
tdbTrace("pager/flush drop page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
|
||||
tdbTrace("pager/flush drop page: %p, pgno:%d, from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
|
||||
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||
|
||||
|
@ -829,7 +840,7 @@ static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tdbPagerRestore(SPager *pPager, SBTree *pBt, const char *jFileName) {
|
||||
static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
|
||||
int ret = 0;
|
||||
SPgno journalSize = 0;
|
||||
u8 *pageBuf = NULL;
|
||||
|
@ -907,7 +918,7 @@ static int tdbPagerRestore(SPager *pPager, SBTree *pBt, const char *jFileName) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt) {
|
||||
int tdbPagerRestoreJournals(SPager *pPager) {
|
||||
tdbDirEntryPtr pDirEntry;
|
||||
tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName);
|
||||
if (pDir == NULL) {
|
||||
|
@ -918,7 +929,7 @@ int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt) {
|
|||
while ((pDirEntry = tdbReadDir(pDir)) != NULL) {
|
||||
char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry));
|
||||
if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) {
|
||||
if (tdbPagerRestore(pPager, pBt, name) < 0) {
|
||||
if (tdbPagerRestore(pPager, name) < 0) {
|
||||
tdbCloseDir(&pDir);
|
||||
|
||||
tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name);
|
||||
|
|
|
@ -107,6 +107,16 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
|
|||
|
||||
ASSERT(pPager != NULL);
|
||||
|
||||
if (rollback) {
|
||||
tdbPagerRollback(pPager);
|
||||
} else {
|
||||
ret = tdbPagerRestoreJournals(pPager);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pTb);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// pTb->pBt
|
||||
ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, pEnv, &(pTb->pBt));
|
||||
if (ret < 0) {
|
||||
|
@ -114,16 +124,6 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (rollback) {
|
||||
tdbPagerRollback(pPager);
|
||||
} else {
|
||||
ret = tdbPagerRestoreJournals(pPager, pTb->pBt);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pTb);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
*ppTb = pTb;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -197,7 +197,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initP
|
|||
TXN *pTxn);
|
||||
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
|
||||
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
|
||||
int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt);
|
||||
int tdbPagerRestoreJournals(SPager *pPager);
|
||||
int tdbPagerRollback(SPager *pPager);
|
||||
|
||||
// tdbPCache.c ====================================
|
||||
|
@ -205,6 +205,7 @@ int tdbPagerRollback(SPager *pPager);
|
|||
u8 isAnchor; \
|
||||
u8 isLocal; \
|
||||
u8 isDirty; \
|
||||
u8 isFree; \
|
||||
volatile i32 nRef; \
|
||||
i32 id; \
|
||||
SPage *pFreeNext; \
|
||||
|
@ -222,6 +223,7 @@ int tdbPCacheClose(SPCache *pCache);
|
|||
int tdbPCacheAlter(SPCache *pCache, int32_t nPage);
|
||||
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
|
||||
void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn);
|
||||
void tdbPCacheMarkFree(SPCache *pCache, SPage *pPage);
|
||||
int tdbPCacheGetPageSize(SPCache *pCache);
|
||||
|
||||
// tdbPage.c ====================================
|
||||
|
|
Loading…
Reference in New Issue