Merge pull request #17932 from taosdata/fix/tdb-prep-async-commit
enh(tdb): new interface of meta prep async commit
This commit is contained in:
commit
1f91eef038
|
@ -102,6 +102,7 @@ int metaClose(SMeta* pMeta);
|
|||
int metaBegin(SMeta* pMeta, int8_t fromSys);
|
||||
int metaCommit(SMeta* pMeta);
|
||||
int metaFinishCommit(SMeta* pMeta);
|
||||
int metaPrepareAsyncCommit(SMeta* pMeta);
|
||||
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
|
||||
|
|
|
@ -35,6 +35,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) {
|
|||
// commit the meta txn
|
||||
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); }
|
||||
int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); }
|
||||
int metaPrepareAsyncCommit(SMeta *pMeta) { return tdbPrepareAsyncCommit(pMeta->pEnv, &pMeta->txn); }
|
||||
|
||||
// abort the meta txn
|
||||
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); }
|
||||
|
|
|
@ -125,6 +125,10 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -147,6 +151,10 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -226,6 +234,10 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFree(buf);
|
||||
return 0;
|
||||
|
@ -250,6 +262,10 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
|||
} else {
|
||||
code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
|
||||
if (code) goto _err;
|
||||
code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
taosMemoryFree(pWriter);
|
||||
|
|
|
@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
|||
} else {
|
||||
code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
|
||||
if (code) goto _err;
|
||||
code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
taosMemoryFree(pWriter);
|
||||
|
|
|
@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) {
|
|||
} else {
|
||||
code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
|
||||
if (code) goto _err;
|
||||
code = tdbPostCommit(pWriter->pTq->pMetaStore, &pWriter->txn);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
taosMemoryFree(pWriter);
|
||||
|
|
|
@ -70,6 +70,7 @@ _err:
|
|||
|
||||
void streamMetaClose(SStreamMeta* pMeta) {
|
||||
tdbCommit(pMeta->db, &pMeta->txn);
|
||||
tdbPostCommit(pMeta->db, &pMeta->txn);
|
||||
tdbTbClose(pMeta->pTaskDb);
|
||||
tdbTbClose(pMeta->pCheckpointDb);
|
||||
tdbClose(pMeta->db);
|
||||
|
|
|
@ -142,6 +142,7 @@ _err:
|
|||
|
||||
void streamStateClose(SStreamState* pState) {
|
||||
tdbCommit(pState->db, &pState->txn);
|
||||
tdbPostCommit(pState->db, &pState->txn);
|
||||
tdbTbClose(pState->pStateDb);
|
||||
tdbTbClose(pState->pFuncStateDb);
|
||||
tdbTbClose(pState->pFillStateDb);
|
||||
|
@ -168,6 +169,9 @@ int32_t streamStateCommit(SStreamState* pState) {
|
|||
if (tdbCommit(pState->db, &pState->txn) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tdbPostCommit(pState->db, &pState->txn) < 0) {
|
||||
return -1;
|
||||
}
|
||||
memset(&pState->txn, 0, sizeof(TXN));
|
||||
if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
|
||||
0) {
|
||||
|
|
|
@ -36,6 +36,7 @@ int32_t tdbClose(TDB *pDb);
|
|||
int32_t tdbBegin(TDB *pDb, TXN *pTxn);
|
||||
int32_t tdbCommit(TDB *pDb, TXN *pTxn);
|
||||
int32_t tdbPostCommit(TDB *pDb, TXN *pTxn);
|
||||
int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn);
|
||||
int32_t tdbAbort(TDB *pDb, TXN *pTxn);
|
||||
int32_t tdbAlter(TDB *pDb, int pages);
|
||||
|
||||
|
|
|
@ -140,6 +140,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
|
|||
// tdbUnrefPage(pPage);
|
||||
tdbPCacheRelease(pPager->pCache, pPage, &txn);
|
||||
tdbCommit(pPager->pEnv, &txn);
|
||||
tdbPostCommit(pPager->pEnv, &txn);
|
||||
tdbTxnClose(&txn);
|
||||
}
|
||||
|
||||
|
|
|
@ -138,7 +138,24 @@ int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) {
|
|||
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
|
||||
ret = tdbPagerPostCommit(pPager, pTxn);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName, pTxn->txnId);
|
||||
tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName,
|
||||
pTxn->txnId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn) {
|
||||
SPager *pPager;
|
||||
int ret;
|
||||
|
||||
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
|
||||
ret = tdbPagerPrepareAsyncCommit(pPager, pTxn);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName,
|
||||
pTxn->txnId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -287,6 +287,10 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
|
||||
// remove the journal file
|
||||
if (tdbOsClose(pPager->jfd) < 0) {
|
||||
tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName);
|
||||
|
@ -305,15 +309,54 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
|
||||
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) {
|
||||
tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName);
|
||||
int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
|
||||
SPage *pPage;
|
||||
int ret;
|
||||
|
||||
// sync the journal file
|
||||
ret = tdbOsFSync(pPager->jfd);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pPager->inTran = 0;
|
||||
// loop to write the dirty pages to file
|
||||
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
|
||||
SRBTreeNode *pNode = NULL;
|
||||
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
|
||||
pPage = (SPage *)pNode;
|
||||
if (pPage->isLocal) continue;
|
||||
ret = tdbPagerWritePageToDB(pPager, pPage);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to write page to db since %s", tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
|
||||
pPager->dbOrigSize = pPager->dbFileSize;
|
||||
|
||||
// release the page
|
||||
iter = tRBTreeIterCreate(&pPager->rbt, 1);
|
||||
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
|
||||
pPage = (SPage *)pNode;
|
||||
if (pPage->isLocal) continue;
|
||||
pPage->isDirty = 0;
|
||||
|
||||
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
|
||||
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||
}
|
||||
/*
|
||||
tRBTreeCreate(&pPager->rbt, pageCmpFn);
|
||||
|
||||
// sync the db file
|
||||
if (tdbOsFSync(pPager->fd) < 0) {
|
||||
tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
*/
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -191,6 +191,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage);
|
|||
int tdbPagerBegin(SPager *pPager, TXN *pTxn);
|
||||
int tdbPagerCommit(SPager *pPager, TXN *pTxn);
|
||||
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn);
|
||||
int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn);
|
||||
int tdbPagerAbort(SPager *pPager, TXN *pTxn);
|
||||
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
|
||||
TXN *pTxn);
|
||||
|
|
Loading…
Reference in New Issue