From c17f8244be3372dcb0066d78003f04f5892eb805 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 7 Nov 2022 16:45:29 +0800 Subject: [PATCH 1/2] enh(tdb): new interface of meta prep async commit --- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/meta/metaCommit.c | 1 + source/libs/tdb/inc/tdb.h | 1 + source/libs/tdb/src/db/tdbDb.c | 19 ++++++++- source/libs/tdb/src/db/tdbPager.c | 51 ++++++++++++++++++++++-- source/libs/tdb/src/inc/tdbInt.h | 1 + 6 files changed, 69 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 44ecf64419..ac9fabf052 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -102,6 +102,7 @@ int metaClose(SMeta* pMeta); int metaBegin(SMeta* pMeta, int8_t fromSys); int metaCommit(SMeta* pMeta); int metaFinishCommit(SMeta* pMeta); +int metaPrepareAsyncCommit(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 01ad833d20..0be0c3e407 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -35,6 +35,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) { // commit the meta txn int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); } int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); } +int metaPrepareAsyncCommit(SMeta *pMeta) { return tdbPrepareAsyncCommit(pMeta->pEnv, &pMeta->txn); } // abort the meta txn int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); } diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 3d92d164b4..7ab2bc3995 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -36,6 +36,7 @@ int32_t tdbClose(TDB *pDb); int32_t tdbBegin(TDB *pDb, TXN *pTxn); int32_t tdbCommit(TDB *pDb, TXN *pTxn); int32_t tdbPostCommit(TDB *pDb, TXN *pTxn); +int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn); int32_t tdbAbort(TDB *pDb, TXN *pTxn); int32_t tdbAlter(TDB *pDb, int pages); diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index 855e3510f2..5aff5b7bb2 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -138,7 +138,24 @@ int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerPostCommit(pPager, pTxn); if (ret < 0) { - tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName, pTxn->txnId); + tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName, + pTxn->txnId); + return -1; + } + } + + return 0; +} + +int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn) { + SPager *pPager; + int ret; + + for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { + ret = tdbPagerPrepareAsyncCommit(pPager, pTxn); + if (ret < 0) { + tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName, + pTxn->txnId); return -1; } } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index cee54c1a73..c3ae1dc739 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -287,6 +287,10 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { return -1; } + return 0; +} + +int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { // remove the journal file if (tdbOsClose(pPager->jfd) < 0) { tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName); @@ -305,15 +309,54 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { return 0; } -int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { - if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { - tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName); +int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { + SPage *pPage; + int ret; + + // sync the journal file + ret = tdbOsFSync(pPager->jfd); + if (ret < 0) { + tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pPager->inTran = 0; + // loop to write the dirty pages to file + SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1); + SRBTreeNode *pNode = NULL; + while ((pNode = tRBTreeIterNext(&iter)) != NULL) { + pPage = (SPage *)pNode; + if (pPage->isLocal) continue; + ret = tdbPagerWritePageToDB(pPager, pPage); + if (ret < 0) { + tdbError("failed to write page to db since %s", tstrerror(terrno)); + return -1; + } + } + tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize); + pPager->dbOrigSize = pPager->dbFileSize; + + // release the page + iter = tRBTreeIterCreate(&pPager->rbt, 1); + while ((pNode = tRBTreeIterNext(&iter)) != NULL) { + pPage = (SPage *)pNode; + if (pPage->isLocal) continue; + pPage->isDirty = 0; + + tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + tdbPCacheRelease(pPager->pCache, pPage, pTxn); + } + /* + tRBTreeCreate(&pPager->rbt, pageCmpFn); + + // sync the db file + if (tdbOsFSync(pPager->fd) < 0) { + tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + */ return 0; } diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index b45747c972..e5ece98b28 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -191,6 +191,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage); int tdbPagerBegin(SPager *pPager, TXN *pTxn); int tdbPagerCommit(SPager *pPager, TXN *pTxn); int tdbPagerPostCommit(SPager *pPager, TXN *pTxn); +int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn); int tdbPagerAbort(SPager *pPager, TXN *pTxn); int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg, TXN *pTxn); From 1a4cf6964b1ed2c1ca0f874cd1e536af73e07cd8 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 7 Nov 2022 18:28:45 +0800 Subject: [PATCH 2/2] fix: add post commit to tq & stream --- source/dnode/vnode/src/tq/tqMeta.c | 16 ++++++++++++++++ source/dnode/vnode/src/tq/tqSnapshot.c | 2 ++ source/dnode/vnode/src/tq/tqStreamStateSnap.c | 2 ++ source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 2 ++ source/libs/stream/src/streamMeta.c | 1 + source/libs/stream/src/streamState.c | 4 ++++ source/libs/tdb/src/db/tdbBtree.c | 1 + 7 files changed, 28 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index b852acb936..27c491c86b 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -125,6 +125,10 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ return -1; } + if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) { + return -1; + } + return 0; } @@ -147,6 +151,10 @@ int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { ASSERT(0); } + if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + return 0; } @@ -226,6 +234,10 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ASSERT(0); } + if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + tEncoderClear(&encoder); taosMemoryFree(buf); return 0; @@ -250,6 +262,10 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { ASSERT(0); } + if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + return 0; } diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index c52e0e2c09..b68763867e 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { } else { code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); if (code) goto _err; + code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn); + if (code) goto _err; } taosMemoryFree(pWriter); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 92e5f8df7a..08d5931bc3 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { } else { code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); if (code) goto _err; + code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn); + if (code) goto _err; } taosMemoryFree(pWriter); diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index b4a7ce7737..31e44a5b6d 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -169,6 +169,8 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { } else { code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); if (code) goto _err; + code = tdbPostCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + if (code) goto _err; } taosMemoryFree(pWriter); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 98e4e77cb0..682bfbaf8d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -70,6 +70,7 @@ _err: void streamMetaClose(SStreamMeta* pMeta) { tdbCommit(pMeta->db, &pMeta->txn); + tdbPostCommit(pMeta->db, &pMeta->txn); tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pCheckpointDb); tdbClose(pMeta->db); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 9829955baf..cf2ead364b 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -142,6 +142,7 @@ _err: void streamStateClose(SStreamState* pState) { tdbCommit(pState->db, &pState->txn); + tdbPostCommit(pState->db, &pState->txn); tdbTbClose(pState->pStateDb); tdbTbClose(pState->pFuncStateDb); tdbTbClose(pState->pFillStateDb); @@ -168,6 +169,9 @@ int32_t streamStateCommit(SStreamState* pState) { if (tdbCommit(pState->db, &pState->txn) < 0) { return -1; } + if (tdbPostCommit(pState->db, &pState->txn) < 0) { + return -1; + } memset(&pState->txn, 0, sizeof(TXN)); if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index e4d3cc3488..e3860f85c6 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -140,6 +140,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg // tdbUnrefPage(pPage); tdbPCacheRelease(pPager->pCache, pPage, &txn); tdbCommit(pPager->pEnv, &txn); + tdbPostCommit(pPager->pEnv, &txn); tdbTxnClose(&txn); }