Merge pull request #15675 from taosdata/fix/TD-18080
fix: new API tdbAbort for tdb txn aborting
This commit is contained in:
commit
52e2b71de5
|
@ -18,6 +18,7 @@
|
||||||
static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBufPoolMalloc((SVBufPool *)pPool, size); }
|
static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBufPoolMalloc((SVBufPool *)pPool, size); }
|
||||||
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); }
|
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); }
|
||||||
|
|
||||||
|
// begin a meta txn
|
||||||
int metaBegin(SMeta *pMeta) {
|
int metaBegin(SMeta *pMeta) {
|
||||||
tdbTxnOpen(&pMeta->txn, 0, metaMalloc, metaFree, pMeta->pVnode->inUse, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
tdbTxnOpen(&pMeta->txn, 0, metaMalloc, metaFree, pMeta->pVnode->inUse, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
|
||||||
|
@ -28,4 +29,8 @@ int metaBegin(SMeta *pMeta) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// commit the meta txn
|
||||||
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); }
|
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); }
|
||||||
|
|
||||||
|
// abort the meta txn
|
||||||
|
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); }
|
||||||
|
|
|
@ -35,6 +35,7 @@ int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb);
|
||||||
int32_t tdbClose(TDB *pDb);
|
int32_t tdbClose(TDB *pDb);
|
||||||
int32_t tdbBegin(TDB *pDb, TXN *pTxn);
|
int32_t tdbBegin(TDB *pDb, TXN *pTxn);
|
||||||
int32_t tdbCommit(TDB *pDb, TXN *pTxn);
|
int32_t tdbCommit(TDB *pDb, TXN *pTxn);
|
||||||
|
int32_t tdbAbort(TDB *pDb, TXN *pTxn);
|
||||||
|
|
||||||
// TTB
|
// TTB
|
||||||
int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb);
|
int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb);
|
||||||
|
@ -87,4 +88,4 @@ enum { TDB_CODE_SUCCESS = 0, TDB_CODE_MAX };
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_TDB_H_*/
|
#endif /*_TD_TDB_H_*/
|
||||||
|
|
|
@ -30,6 +30,8 @@ struct SBTree {
|
||||||
int minLocal;
|
int minLocal;
|
||||||
int maxLeaf;
|
int maxLeaf;
|
||||||
int minLeaf;
|
int minLeaf;
|
||||||
|
SBtInfo info;
|
||||||
|
char *tbname;
|
||||||
void *pBuf;
|
void *pBuf;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -123,7 +125,12 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcmp(TDB_MAINDB_NAME, tbname)) {
|
if (strcmp(TDB_MAINDB_NAME, tbname)) {
|
||||||
ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pgno, sizeof(SPgno), &txn);
|
pBt->info.root = pgno;
|
||||||
|
pBt->info.nLevel = 1;
|
||||||
|
pBt->info.nData = 0;
|
||||||
|
pBt->tbname = (char *)tbname;
|
||||||
|
// ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pgno, sizeof(SPgno), &txn);
|
||||||
|
ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pBt->info, sizeof(pBt->info), &txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb) {
|
||||||
|
|
||||||
#ifdef USE_MAINDB
|
#ifdef USE_MAINDB
|
||||||
// open main db
|
// open main db
|
||||||
ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SPgno), NULL, pDb, &pDb->pMainDb);
|
ret = tdbTbOpen(TDB_MAINDB_NAME, -1, sizeof(SBtInfo), NULL, pDb, &pDb->pMainDb);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -97,7 +97,7 @@ int tdbClose(TDB *pDb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tdbBegin(TDB *pDb, TXN *pTxn) {
|
int32_t tdbBegin(TDB *pDb, TXN *pTxn) {
|
||||||
SPager *pPager;
|
SPager *pPager;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
|
@ -112,7 +112,7 @@ int tdbBegin(TDB *pDb, TXN *pTxn) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tdbCommit(TDB *pDb, TXN *pTxn) {
|
int32_t tdbCommit(TDB *pDb, TXN *pTxn) {
|
||||||
SPager *pPager;
|
SPager *pPager;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
|
@ -127,6 +127,21 @@ int tdbCommit(TDB *pDb, TXN *pTxn) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tdbAbort(TDB *pDb, TXN *pTxn) {
|
||||||
|
SPager *pPager;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
|
||||||
|
ret = tdbPagerAbort(pPager, pTxn);
|
||||||
|
if (ret < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SPager *tdbEnvGetPager(TDB *pDb, const char *fname) {
|
SPager *tdbEnvGetPager(TDB *pDb, const char *fname) {
|
||||||
u32 hash;
|
u32 hash;
|
||||||
SPager **ppPager;
|
SPager **ppPager;
|
||||||
|
|
|
@ -253,7 +253,70 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
||||||
// sync the db file
|
// sync the db file
|
||||||
tdbOsFSync(pPager->fd);
|
tdbOsFSync(pPager->fd);
|
||||||
|
|
||||||
// remote the journal file
|
// remove the journal file
|
||||||
|
tdbOsClose(pPager->jfd);
|
||||||
|
tdbOsRemove(pPager->jFileName);
|
||||||
|
pPager->inTran = 0;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// recovery dirty pages
|
||||||
|
int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
||||||
|
SPage *pPage;
|
||||||
|
int pgIdx;
|
||||||
|
SPgno journalSize = 0;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
// 0, sync the journal file
|
||||||
|
ret = tdbOsFSync(pPager->jfd);
|
||||||
|
if (ret < 0) {
|
||||||
|
// TODO
|
||||||
|
ASSERT(0);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755);
|
||||||
|
if (jfd == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
|
||||||
|
if (ret < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1, read pages from jounal file
|
||||||
|
// 2, write original pages to buffered ones
|
||||||
|
|
||||||
|
/* TODO: reset the buffered pages instead of releasing them
|
||||||
|
// loop to reset the dirty pages from file
|
||||||
|
for (pgIdx = 0, pPage = pPager->pDirty; pPage != NULL && pgIndex < journalSize; pPage = pPage->pDirtyNext, ++pgIdx) {
|
||||||
|
// read pgno & the page from journal
|
||||||
|
SPgno pgno;
|
||||||
|
|
||||||
|
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
|
||||||
|
if (ret < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
|
||||||
|
if (ret < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
// 3, release the dirty pages
|
||||||
|
for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
|
||||||
|
pPager->pDirty = pPage->pDirtyNext;
|
||||||
|
pPage->pDirtyNext = NULL;
|
||||||
|
|
||||||
|
pPage->isDirty = 0;
|
||||||
|
|
||||||
|
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4, remove the journal file
|
||||||
tdbOsClose(pPager->jfd);
|
tdbOsClose(pPager->jfd);
|
||||||
tdbOsRemove(pPager->jFileName);
|
tdbOsRemove(pPager->jFileName);
|
||||||
pPager->inTran = 0;
|
pPager->inTran = 0;
|
||||||
|
@ -475,8 +538,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
|
||||||
|
|
||||||
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
|
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
|
||||||
// read pgno & the page from journal
|
// read pgno & the page from journal
|
||||||
SPgno pgno;
|
SPgno pgno;
|
||||||
SPage *pPage;
|
|
||||||
|
|
||||||
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
|
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
|
|
@ -189,6 +189,7 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt);
|
||||||
int tdbPagerWrite(SPager *pPager, SPage *pPage);
|
int tdbPagerWrite(SPager *pPager, SPage *pPage);
|
||||||
int tdbPagerBegin(SPager *pPager, TXN *pTxn);
|
int tdbPagerBegin(SPager *pPager, TXN *pTxn);
|
||||||
int tdbPagerCommit(SPager *pPager, TXN *pTxn);
|
int tdbPagerCommit(SPager *pPager, TXN *pTxn);
|
||||||
|
int tdbPagerAbort(SPager *pPager, TXN *pTxn);
|
||||||
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
|
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
|
||||||
TXN *pTxn);
|
TXN *pTxn);
|
||||||
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
|
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
|
||||||
|
|
Loading…
Reference in New Issue