From 1441684c03f5a6e6bab5749af706c8d300fe0024 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 17 Nov 2022 09:29:27 +0800 Subject: [PATCH 01/10] cleanup: remove unused pager open --- source/libs/tdb/src/db/tdbPager.c | 82 ++++++------------------------- 1 file changed, 16 insertions(+), 66 deletions(-) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index abbad06515..c4346f62f4 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -28,12 +28,12 @@ typedef struct { TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct"); struct hashset_st { - size_t nbits; - size_t mask; - size_t capacity; + size_t nbits; + size_t mask; + size_t capacity; size_t *items; - size_t nitems; - double load_factor; + size_t nitems; + double load_factor; }; static const unsigned int prime = 39; @@ -68,11 +68,11 @@ void hashset_destroy(hashset_t set) { } int hashset_add_member(hashset_t set, void *item) { - size_t value = (size_t) item; + size_t value = (size_t)item; size_t h; if (value == 0) { - return -1; + return -1; } for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) { @@ -103,7 +103,7 @@ int hashset_add(hashset_t set, void *item) { set->nitems = 0; for (size_t i = 0; i < old_capacity; ++i) { - hashset_add_member(set, (void*)old_items[i]); + hashset_add_member(set, (void *)old_items[i]); } tdbOsFree(old_items); } @@ -112,7 +112,7 @@ int hashset_add(hashset_t set, void *item) { } int hashset_remove(hashset_t set, void *item) { - size_t value = (size_t) item; + size_t value = (size_t)item; for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) { if (set->items[h] == value) { @@ -126,7 +126,7 @@ int hashset_remove(hashset_t set, void *item) { } int hashset_contains(hashset_t set, void *item) { - size_t value = (size_t) item; + size_t value = (size_t)item; for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) { if (set->items[h] == value) { @@ -226,58 +226,7 @@ int tdbPagerClose(SPager *pPager) { } return 0; } -/* -int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate, SBTree *pBt) { - SPgno pgno; - SPage *pPage; - int ret; - if (pPager->dbOrigSize > 0) { - pgno = 1; - } else { - pgno = 0; - } - - { - // TODO: try to search the main DB to get the page number - // pgno = 0; - } - - if (pgno == 0 && toCreate) { - // allocate a new child page - TXN txn; - tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0); - - pPager->inTran = 1; - - SBtreeInitPageArg zArg; - zArg.flags = 0x1 | 0x2; // root leaf node; - zArg.pBt = pBt; - ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn); - if (ret < 0) { - return -1; - } - - // ret = tdbPagerAllocPage(pPager, &pPage, &pgno); - // if (ret < 0) { - // return -1; - //} - - // TODO: Need to zero the page - - ret = tdbPagerWrite(pPager, pPage); - if (ret < 0) { - tdbError("failed to write page since %s", terrstr()); - return -1; - } - - tdbTxnClose(&txn); - } - - *ppgno = pgno; - return 0; -} -*/ int tdbPagerWrite(SPager *pPager, SPage *pPage) { int ret; SPage **ppPage; @@ -319,7 +268,8 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage); // Write page to journal if neccessary - if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize && (pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))))) { + if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize && + (pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))))) { ret = tdbPagerWritePageToJournal(pPager, pPage); if (ret < 0) { tdbError("failed to write page to journal since %s", tstrerror(terrno)); @@ -327,7 +277,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { } if (pPager->jPageSet) { - hashset_add(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))); + hashset_add(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); } } @@ -391,7 +341,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); if (pPager->jPageSet) { - hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))); + hashset_remove(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); } tdbPCacheRelease(pPager->pCache, pPage, pTxn); } @@ -503,7 +453,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { return -1; } - u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize); + u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize); if (pageBuf == NULL) { return -1; } @@ -560,7 +510,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { pPage->isDirty = 0; tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); - hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))); + hashset_remove(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); tdbPCacheRelease(pPager->pCache, pPage, pTxn); } From 78335f83d2381d10e200e420d22145572dc9184c Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 17 Nov 2022 11:54:07 +0800 Subject: [PATCH 02/10] tdb/api: migrate txn related api changes of meta, sma, tq, and stream meta/state --- include/libs/stream/streamState.h | 2 +- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/inc/meta.h | 4 +- source/dnode/vnode/src/meta/metaCommit.c | 21 ++-- source/dnode/vnode/src/meta/metaSma.c | 8 +- source/dnode/vnode/src/meta/metaTable.c | 66 ++++++------- source/dnode/vnode/src/tq/tqMeta.c | 61 +++++------- source/dnode/vnode/src/tq/tqSnapshot.c | 14 +-- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 13 ++- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 13 ++- source/libs/stream/src/streamMeta.c | 39 ++++---- source/libs/stream/src/streamState.c | 54 +++++------ source/libs/tdb/inc/tdb.h | 3 +- source/libs/tdb/src/db/tdbBtree.c | 47 ++++++--- source/libs/tdb/src/db/tdbDb.c | 21 +++- source/libs/tdb/src/db/tdbTable.c | 2 +- source/libs/tdb/src/db/tdbTxn.c | 8 +- source/libs/tdb/src/inc/tdbInt.h | 14 +-- source/libs/tdb/test/tdbExOVFLTest.cpp | 54 +++++------ source/libs/tdb/test/tdbTest.cpp | 96 ++++++++----------- 20 files changed, 274 insertions(+), 268 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 9443df5e14..13dc0be75c 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -35,7 +35,7 @@ typedef struct { TTB* pFuncStateDb; TTB* pFillStateDb; // todo refactor TTB* pSessionStateDb; - TXN txn; + TXN* txn; int32_t number; } SStreamState; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ecd1b6f916..7ca299d312 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -562,7 +562,7 @@ typedef struct SStreamMeta { SHashObj* pTasks; SHashObj* pRecoverStatus; void* ahandle; - TXN txn; + TXN* txn; FTaskExpand* expandFunc; int32_t vgId; SRWLatch lock; diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 2ceae91f7c..11efd63d10 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -70,7 +70,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo); int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); -void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta); +void metaUpdateStbStats(SMeta* pMeta, int64_t uid, int64_t delta); struct SMeta { TdThreadRwlock lock; @@ -78,7 +78,7 @@ struct SMeta { char* path; SVnode* pVnode; TDB* pEnv; - TXN txn; + TXN* txn; TTB* pTbDb; TTB* pSkmDb; TTB* pUidIdx; diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 0be0c3e407..c46c219ac3 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -20,12 +20,19 @@ static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVB // begin a meta txn int metaBegin(SMeta *pMeta, int8_t fromSys) { + void *(*xMalloc)(void *, size_t); + void (*xFree)(void *, void *); + void *xArg = NULL; + if (fromSys) { - tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + xMalloc = tdbDefaultMalloc; + xFree = tdbDefaultFree; } else { - tdbTxnOpen(&pMeta->txn, 0, metaMalloc, metaFree, pMeta->pVnode->inUse, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + xMalloc = metaMalloc; + xFree = metaFree; + xArg = pMeta->pVnode->inUse; } - if (tdbBegin(pMeta->pEnv, &pMeta->txn) < 0) { + if (tdbBegin(pMeta->pEnv, &pMeta->txn, xMalloc, xFree, xArg, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } @@ -33,9 +40,9 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) { } // commit the meta txn -int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); } -int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); } -int metaPrepareAsyncCommit(SMeta *pMeta) { return tdbPrepareAsyncCommit(pMeta->pEnv, &pMeta->txn); } +int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, pMeta->txn); } +int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, pMeta->txn); } +int metaPrepareAsyncCommit(SMeta *pMeta) { return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn); } // abort the meta txn -int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); } +int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, pMeta->txn); } diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index 3ada7d1814..2e3cbb97cf 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -117,7 +117,7 @@ static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) { tEncoderClear(&coder); // write to table.db - if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { + if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, pMeta->txn) < 0) { goto _err; } @@ -131,17 +131,17 @@ _err: static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) { SUidIdxVal uidIdxVal = {.suid = pME->smaEntry.tsma->indexUid, .version = pME->version, .skmVer = 0}; - return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn); + return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), pMeta->txn); } static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { - return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn); + return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn); } static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) { SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid}; - return tdbTbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn); + return tdbTbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, pMeta->txn); } static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 5921adfbfa..16302350ba 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -261,7 +261,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb // drop all child tables TBC *pCtbIdxc = NULL; - tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn); + tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL); rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); if (rc < 0) { tdbTbcClose(pCtbIdxc); @@ -295,10 +295,10 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb _drop_super_table: tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData); tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = ((SUidIdxVal *)pData)[0].version, .uid = pReq->suid}, - sizeof(STbDbKey), &pMeta->txn); - tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn); - tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn); - tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn); + sizeof(STbDbKey), pMeta->txn); + tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, pMeta->txn); + tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn); + tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), pMeta->txn); metaULock(pMeta); @@ -321,7 +321,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { int32_t ret; int32_t c = -2; - tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL); ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c); if (ret < 0 || c) { tdbTbcClose(pUidIdxc); @@ -340,7 +340,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { oversion = ((SUidIdxVal *)pData)[0].version; - tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL); ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c); ASSERT(ret == 0 && c == 0); @@ -589,7 +589,7 @@ static int metaDeleteTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { STtlIdxKey ttlKey = {0}; metaBuildTtlIdxKey(&ttlKey, pME); if (ttlKey.dtime == 0) return 0; - return tdbTbDelete(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), &pMeta->txn); + return tdbTbDelete(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), pMeta->txn); } static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { @@ -651,7 +651,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { if (metaCreateTagIdxKey(e.ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, uid, &pTagIdxKey, &nTagIdxKey) == 0) { - tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, &pMeta->txn); + tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn); } metaDestroyTagIdxKey(pTagIdxKey); } @@ -661,9 +661,9 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { } } - tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pMeta->txn); - tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, &pMeta->txn); - tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), &pMeta->txn); + tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), pMeta->txn); + tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, pMeta->txn); + tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), pMeta->txn); if (e.type == TSDB_CHILD_TABLE || e.type == TSDB_NORMAL_TABLE) metaDeleteCtimeIdx(pMeta, &e); if (e.type == TSDB_NORMAL_TABLE) metaDeleteNcolIdx(pMeta, &e); @@ -671,7 +671,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { if (e.type != TSDB_SUPER_TABLE) metaDeleteTtlIdx(pMeta, &e); if (e.type == TSDB_CHILD_TABLE) { - tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn); + tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn); --pMeta->pVnode->config.vndStats.numOfCTables; @@ -682,7 +682,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { --pMeta->pVnode->config.vndStats.numOfNTables; pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1; } else if (e.type == TSDB_SUPER_TABLE) { - tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), &pMeta->txn); + tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn); // drop schema.db (todo) metaStatsCacheDrop(pMeta, uid); @@ -702,7 +702,7 @@ int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) { if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) { return 0; } - return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, &pMeta->txn); + return tdbTbInsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, pMeta->txn); } int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) { @@ -710,14 +710,14 @@ int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) { if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) { return 0; } - return tdbTbDelete(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), &pMeta->txn); + return tdbTbDelete(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), pMeta->txn); } int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) { SNcolIdxKey ncolKey = {0}; if (metaBuildNColIdxKey(&ncolKey, pME) < 0) { return 0; } - return tdbTbInsert(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), NULL, 0, &pMeta->txn); + return tdbTbInsert(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), NULL, 0, pMeta->txn); } int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME) { @@ -725,7 +725,7 @@ int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME) { if (metaBuildNColIdxKey(&ncolKey, pME) < 0) { return 0; } - return tdbTbDelete(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), &pMeta->txn); + return tdbTbDelete(pMeta->pNcolIdx, &ncolKey, sizeof(ncolKey), pMeta->txn); } static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq, STableMetaRsp *pMetaRsp) { @@ -760,7 +760,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl // search uid index TBC *pUidIdxc = NULL; - tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL); tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); ASSERT(c == 0); @@ -770,7 +770,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl // search table.db TBC *pTbDbc = NULL; - tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL); tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); ASSERT(c == 0); tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); @@ -951,7 +951,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA // search uid index TBC *pUidIdxc = NULL; - tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL); tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); ASSERT(c == 0); @@ -964,7 +964,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA SDecoder dc2 = {0}; /* get ctbEntry */ - tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL); tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); ASSERT(c == 0); tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); @@ -1062,7 +1062,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ASSERT(ctbEntry.ctbEntry.pTags); SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid}; tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags, - ((STag *)(ctbEntry.ctbEntry.pTags))->len, &pMeta->txn); + ((STag *)(ctbEntry.ctbEntry.pTags))->len, pMeta->txn); metaULock(pMeta); @@ -1110,7 +1110,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p // search uid index TBC *pUidIdxc = NULL; - tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, NULL); tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); ASSERT(c == 0); @@ -1120,7 +1120,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p // search table.db TBC *pTbDbc = NULL; - tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + tdbTbcOpen(pMeta->pTbDb, &pTbDbc, NULL); tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); ASSERT(c == 0); tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); @@ -1227,7 +1227,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { tEncoderClear(&coder); // write to table.db - if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { + if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, pMeta->txn) < 0) { goto _err; } @@ -1250,29 +1250,29 @@ static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) { SUidIdxVal uidIdxVal = {.suid = info.suid, .version = info.version, .skmVer = info.skmVer}; - return tdbTbUpsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn); + return tdbTbUpsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), pMeta->txn); } static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME) { - return tdbTbInsert(pMeta->pSuidIdx, &pME->uid, sizeof(tb_uid_t), NULL, 0, &pMeta->txn); + return tdbTbInsert(pMeta->pSuidIdx, &pME->uid, sizeof(tb_uid_t), NULL, 0, pMeta->txn); } static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { - return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn); + return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn); } static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { STtlIdxKey ttlKey = {0}; metaBuildTtlIdxKey(&ttlKey, pME); if (ttlKey.dtime == 0) return 0; - return tdbTbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn); + return tdbTbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, pMeta->txn); } static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) { SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid}; return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), pME->ctbEntry.pTags, - ((STag *)(pME->ctbEntry.pTags))->len, &pMeta->txn); + ((STag *)(pME->ctbEntry.pTags))->len, pMeta->txn); } int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid, @@ -1364,7 +1364,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { ret = -1; goto end; } - tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn); + tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); } end: metaDestroyTagIdxKey(pTagIdxKey); @@ -1411,7 +1411,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { tEncoderInit(&coder, pVal, vLen); tEncodeSSchemaWrapper(&coder, pSW); - if (tdbTbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) { + if (tdbTbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, pMeta->txn) < 0) { rcode = -1; goto _exit; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index a15d19fbe1..c4098f68e6 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -108,24 +108,22 @@ int32_t tqMetaClose(STQ* pTq) { } int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) { - TXN txn; - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + TXN* txn; + + if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { return -1; } - if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, txn) < 0) { return -1; } - if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, &txn) < 0) { + if (tdbCommit(pTq->pMetaDB, txn) < 0) { return -1; } - if (tdbCommit(pTq->pMetaDB, &txn) < 0) { - return -1; - } - - if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) { + if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { return -1; } @@ -133,25 +131,22 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_ } int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { - TXN txn; + TXN* txn; - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { ASSERT(0); } - if (tdbBegin(pTq->pMetaDB, &txn) < 0) { - ASSERT(0); - } - - if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) { + if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), txn) < 0) { /*ASSERT(0);*/ } - if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + if (tdbCommit(pTq->pMetaDB, txn) < 0) { ASSERT(0); } - if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) { + if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { ASSERT(0); } @@ -219,25 +214,22 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { ASSERT(0); } - TXN txn; + TXN* txn; - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { ASSERT(0); } - if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn) < 0) { ASSERT(0); } - if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) { + if (tdbCommit(pTq->pMetaDB, txn) < 0) { ASSERT(0); } - if (tdbCommit(pTq->pMetaDB, &txn) < 0) { - ASSERT(0); - } - - if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) { + if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { ASSERT(0); } @@ -247,25 +239,22 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { } int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { - TXN txn; + TXN* txn; - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { ASSERT(0); } - if (tdbBegin(pTq->pMetaDB, &txn) < 0) { - ASSERT(0); - } - - if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) { + if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), txn) < 0) { /*ASSERT(0);*/ } - if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + if (tdbCommit(pTq->pMetaDB, txn) < 0) { ASSERT(0); } - if (tdbPostCommit(pTq->pMetaDB, &txn) < 0) { + if (tdbPostCommit(pTq->pMetaDB, txn) < 0) { ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index b68763867e..d811d943ed 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -129,7 +129,7 @@ struct STqSnapWriter { STQ* pTq; int64_t sver; int64_t ever; - TXN txn; + TXN* txn; }; int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { @@ -146,8 +146,10 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p pWriter->sver = sver; pWriter->ever = ever; - if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - ASSERT(0); + if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + code = -1; + taosMemoryFree(pWriter); + goto _err; } *ppWriter = pWriter; @@ -165,11 +167,11 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { STQ* pTq = pWriter->pTq; if (rollback) { - tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn); + tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); } else { - code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; - code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn); + code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; } diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 08d5931bc3..b1f00bdf74 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -129,7 +129,7 @@ struct STqSnapWriter { STQ* pTq; int64_t sver; int64_t ever; - TXN txn; + TXN* txn; }; int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { @@ -146,8 +146,10 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p pWriter->sver = sver; pWriter->ever = ever; - if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - ASSERT(0); + if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + code = -1; + taosMemoryFree(pWriter); + goto _err; } *ppWriter = pWriter; @@ -165,11 +167,12 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { STQ* pTq = pWriter->pTq; if (rollback) { + tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn); ASSERT(0); } else { - code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; - code = tdbPostCommit(pWriter->pTq->pMetaDB, &pWriter->txn); + code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn); if (code) goto _err; } diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 31e44a5b6d..305378bc93 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -129,7 +129,7 @@ struct STqSnapWriter { STQ* pTq; int64_t sver; int64_t ever; - TXN txn; + TXN* txn; }; int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter) { @@ -146,8 +146,10 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p pWriter->sver = sver; pWriter->ever = ever; - if (tdbTxnOpen(&pWriter->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { - ASSERT(0); + if (tdbBegin(pTq->pMetaStore, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + code = -1; + taosMemoryFree(pWriter); + goto _err; } *ppWriter = pWriter; @@ -165,11 +167,12 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { STQ* pTq = pWriter->pTq; if (rollback) { + tdbAbort(pWriter->pTq->pMetaStore, pWriter->txn); ASSERT(0); } else { - code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaStore, pWriter->txn); if (code) goto _err; - code = tdbPostCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + code = tdbPostCommit(pWriter->pTq->pMetaStore, pWriter->txn); if (code) goto _err; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a864814a74..edc16b6062 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -69,8 +69,8 @@ _err: } void streamMetaClose(SStreamMeta* pMeta) { - tdbCommit(pMeta->db, &pMeta->txn); - tdbPostCommit(pMeta->db, &pMeta->txn); + tdbCommit(pMeta->db, pMeta->txn); + tdbPostCommit(pMeta->db, pMeta->txn); tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pCheckpointDb); tdbClose(pMeta->db); @@ -115,7 +115,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, goto FAIL; } - if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) { + if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) { taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t)); ASSERT(0); goto FAIL; @@ -152,7 +152,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { tEncodeSStreamTask(&encoder, pTask); tEncoderClear(&encoder); - if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, &pMeta->txn) < 0) { + if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) { ASSERT(0); return -1; } @@ -226,7 +226,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); - if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), &pMeta->txn) < 0) { + if (tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn) < 0) { /*return -1;*/ } @@ -249,42 +249,35 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { } int32_t streamMetaBegin(SStreamMeta* pMeta) { - if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { - return -1; - } - - if (tdbBegin(pMeta->db, &pMeta->txn) < 0) { + if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } return 0; } int32_t streamMetaCommit(SStreamMeta* pMeta) { - if (tdbCommit(pMeta->db, &pMeta->txn) < 0) { + if (tdbCommit(pMeta->db, pMeta->txn) < 0) { return -1; } - memset(&pMeta->txn, 0, sizeof(TXN)); - if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { + if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { return -1; } - if (tdbBegin(pMeta->db, &pMeta->txn) < 0) { + + if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } return 0; } int32_t streamMetaAbort(SStreamMeta* pMeta) { - if (tdbAbort(pMeta->db, &pMeta->txn) < 0) { + if (tdbAbort(pMeta->db, pMeta->txn) < 0) { return -1; } - memset(&pMeta->txn, 0, sizeof(TXN)); - if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { - return -1; - } - if (tdbBegin(pMeta->db, &pMeta->txn) < 0) { + + if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } return 0; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index aefe30116b..a20efc68ab 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -163,8 +163,8 @@ _err: } void streamStateClose(SStreamState* pState) { - tdbCommit(pState->db, &pState->txn); - tdbPostCommit(pState->db, &pState->txn); + tdbCommit(pState->db, pState->txn); + tdbPostCommit(pState->db, pState->txn); tdbTbClose(pState->pStateDb); tdbTbClose(pState->pFuncStateDb); tdbTbClose(pState->pFillStateDb); @@ -175,71 +175,61 @@ void streamStateClose(SStreamState* pState) { } int32_t streamStateBegin(SStreamState* pState) { - if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { - return -1; - } - - if (tdbBegin(pState->db, &pState->txn) < 0) { - tdbTxnClose(&pState->txn); + if (tdbBegin(pState->db, &pState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + tdbAbort(pState->db, pState->txn); return -1; } return 0; } int32_t streamStateCommit(SStreamState* pState) { - if (tdbCommit(pState->db, &pState->txn) < 0) { + if (tdbCommit(pState->db, pState->txn) < 0) { return -1; } - if (tdbPostCommit(pState->db, &pState->txn) < 0) { + if (tdbPostCommit(pState->db, pState->txn) < 0) { return -1; } - memset(&pState->txn, 0, sizeof(TXN)); - if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { - return -1; - } - if (tdbBegin(pState->db, &pState->txn) < 0) { + + if (tdbBegin(pState->db, &pState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } return 0; } int32_t streamStateAbort(SStreamState* pState) { - if (tdbAbort(pState->db, &pState->txn) < 0) { + if (tdbAbort(pState->db, pState->txn) < 0) { return -1; } - memset(&pState->txn, 0, sizeof(TXN)); - if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < - 0) { - return -1; - } - if (tdbBegin(pState->db, &pState->txn) < 0) { + + if (tdbBegin(pState->db, &pState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } return 0; } int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { - return tdbTbUpsert(pState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->txn); } int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { return tdbTbGet(pState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); } int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { - return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), &pState->txn); + return tdbTbDelete(pState->pFuncStateDb, key, sizeof(STupleKey), pState->txn); } // todo refactor int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->txn); } // todo refactor int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { - return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->txn); } // todo refactor @@ -256,7 +246,7 @@ int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal // todo refactor int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { SStateKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), &pState->txn); + return tdbTbDelete(pState->pStateDb, &sKey, sizeof(SStateKey), pState->txn); } int32_t streamStateClear(SStreamState* pState) { @@ -280,7 +270,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number // todo refactor int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) { - return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), &pState->txn); + return tdbTbDelete(pState->pFillStateDb, key, sizeof(SWinKey), pState->txn); } int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { @@ -512,7 +502,7 @@ void streamFreeVal(void* val) { tdbFree(val); } int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, &pState->txn); + return tdbTbUpsert(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen, pState->txn); } int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { @@ -535,7 +525,7 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn); + return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), pState->txn); } SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) { diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 7ab2bc3995..69c8c3b087 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -33,7 +33,8 @@ typedef struct STxn TXN; // TDB int32_t tdbOpen(const char *dbname, int szPage, int pages, TDB **ppDb, int8_t rollback); int32_t tdbClose(TDB *pDb); -int32_t tdbBegin(TDB *pDb, TXN *pTxn); +int32_t tdbBegin(TDB *pDb, TXN **pTxn, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg, + int flags); int32_t tdbCommit(TDB *pDb, TXN *pTxn); int32_t tdbPostCommit(TDB *pDb, TXN *pTxn); int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn); diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index e3860f85c6..3710ccb106 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -69,7 +69,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN * static int tdbBtcMoveDownward(SBTC *pBtc); static int tdbBtcMoveUpward(SBTC *pBtc); -int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr, +int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr, TDB *pEnv, SBTree **ppBt) { SBTree *pBt; int ret; @@ -106,22 +106,26 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg if (pgno == 0) { // fetch page & insert into main db SPage *pPage; - TXN txn; - tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + TXN *txn; - pPager->inTran = 1; + ret = tdbBegin(pEnv, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + if (ret < 0) { + return -1; + } SBtreeInitPageArg zArg; zArg.flags = 0x1 | 0x2; // root leaf node; zArg.pBt = pBt; - ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, &txn); + ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &zArg, txn); if (ret < 0) { + tdbAbort(pEnv, txn); return -1; } ret = tdbPagerWrite(pPager, pPage); if (ret < 0) { tdbError("failed to write page since %s", terrstr()); + tdbAbort(pEnv, txn); return -1; } @@ -130,18 +134,18 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg pBt->info.nLevel = 1; pBt->info.nData = 0; pBt->tbname = (char *)tbname; - // ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pgno, sizeof(SPgno), &txn); - ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pBt->info, sizeof(pBt->info), &txn); + + ret = tdbTbInsert(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, &pBt->info, sizeof(pBt->info), txn); if (ret < 0) { + tdbAbort(pEnv, txn); return -1; } } - // tdbUnrefPage(pPage); - tdbPCacheRelease(pPager->pCache, pPage, &txn); - tdbCommit(pPager->pEnv, &txn); - tdbPostCommit(pPager->pEnv, &txn); - tdbTxnClose(&txn); + tdbPCacheRelease(pPager->pCache, pPage, txn); + + tdbCommit(pPager->pEnv, txn); + tdbPostCommit(pPager->pEnv, txn); } ASSERT(pgno != 0); @@ -1535,10 +1539,21 @@ int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn) { memset(&pBtc->coder, 0, sizeof(SCellDecoder)); if (pTxn == NULL) { - pBtc->pTxn = &pBtc->txn; - tdbTxnOpen(pBtc->pTxn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0); + TXN *pTxn = tdbOsCalloc(1, sizeof(*pTxn)); + if (!pTxn) { + return -1; + } + + if (tdbTxnOpen(pTxn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { + tdbOsFree(pTxn); + return -1; + } + + pBtc->pTxn = pTxn; + pBtc->freeTxn = 1; } else { pBtc->pTxn = pTxn; + pBtc->freeTxn = 0; } return 0; @@ -2232,6 +2247,10 @@ int tdbBtcClose(SBTC *pBtc) { tdbFree(pBtc->coder.pVal); } + if (pBtc->freeTxn) { + tdbTxnClose(pBtc->pTxn); + } + return 0; } diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index 5aff5b7bb2..bda06aed39 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -99,19 +99,34 @@ int tdbClose(TDB *pDb) { int32_t tdbAlter(TDB *pDb, int pages) { return tdbPCacheAlter(pDb->pCache, pages); } -int32_t tdbBegin(TDB *pDb, TXN *pTxn) { +int32_t tdbBegin(TDB *pDb, TXN **ppTxn, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), void *xArg, + int flags) { SPager *pPager; int ret; + int64_t txnId = 1; + + TXN *pTxn = tdbOsCalloc(1, sizeof(*pTxn)); + if (!pTxn) { + return -1; + } + + if (tdbTxnOpen(pTxn, txnId, xMalloc, xFree, xArg, flags) < 0) { + tdbOsFree(pTxn); + return -1; + } for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { ret = tdbPagerBegin(pPager, pTxn); if (ret < 0) { tdbError("failed to begin pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName, pTxn->txnId); + tdbTxnClose(pTxn); return -1; } } + *ppTxn = pTxn; + return 0; } @@ -144,6 +159,8 @@ int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) { } } + tdbTxnClose(pTxn); + return 0; } @@ -176,6 +193,8 @@ int32_t tdbAbort(TDB *pDb, TXN *pTxn) { } } + tdbTxnClose(pTxn); + return 0; } diff --git a/source/libs/tdb/src/db/tdbTable.c b/source/libs/tdb/src/db/tdbTable.c index 8b029b06d6..636c4fd997 100644 --- a/source/libs/tdb/src/db/tdbTable.c +++ b/source/libs/tdb/src/db/tdbTable.c @@ -108,7 +108,7 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF ASSERT(pPager != NULL); // pTb->pBt - ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, &(pTb->pBt)); + ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, pEnv, &(pTb->pBt)); if (ret < 0) { tdbOsFree(pTb); return -1; diff --git a/source/libs/tdb/src/db/tdbTxn.c b/source/libs/tdb/src/db/tdbTxn.c index f173d89779..77c87d18f2 100644 --- a/source/libs/tdb/src/db/tdbTxn.c +++ b/source/libs/tdb/src/db/tdbTxn.c @@ -28,4 +28,10 @@ int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void return 0; } -int tdbTxnClose(TXN *pTxn) { return 0; } \ No newline at end of file +int tdbTxnClose(TXN *pTxn) { + if (pTxn) { + tdbOsFree(pTxn); + } + + return 0; +} diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 731b1927e7..8e04383b95 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -147,11 +147,11 @@ struct SBTC { SPage *pgStack[BTREE_MAX_DEPTH + 1]; SCellDecoder coder; TXN *pTxn; - TXN txn; + i8 freeTxn; }; // SBTree -int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr, +int tdbBtreeOpen(int keyLen, int valLen, SPager *pFile, char const *tbname, SPgno pgno, tdb_cmpr_fn_t kcmpr, TDB *pEnv, SBTree **ppBt); int tdbBtreeClose(SBTree *pBt); int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn); @@ -396,12 +396,12 @@ struct SPager { SPCache *pCache; SPgno dbFileSize; SPgno dbOrigSize; - //SPage *pDirty; + // SPage *pDirty; hashset_t jPageSet; - SRBTree rbt; - u8 inTran; - SPager *pNext; // used by TDB - SPager *pHashNext; // used by TDB + SRBTree rbt; + u8 inTran; + SPager *pNext; // used by TDB + SPager *pHashNext; // used by TDB #ifdef USE_MAINDB TDB *pEnv; #endif diff --git a/source/libs/tdb/test/tdbExOVFLTest.cpp b/source/libs/tdb/test/tdbExOVFLTest.cpp index 305e91f62c..f4f09b20b8 100644 --- a/source/libs/tdb/test/tdbExOVFLTest.cpp +++ b/source/libs/tdb/test/tdbExOVFLTest.cpp @@ -170,11 +170,9 @@ static void insertOfp(void) { SPoolMem *pPool = openPool(); // start a transaction - TXN txn; - int64_t txnid = 0; - ++txnid; - tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pEnv, &txn); + TXN *txn = NULL; + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); // generate value payload // char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) @@ -185,12 +183,12 @@ static void insertOfp(void) { // insert the generated big data // char const *key = "key1"; char const *key = "key123456789"; - ret = tdbTbInsert(pDb, key, strlen(key), val, valLen, &txn); + ret = tdbTbInsert(pDb, key, strlen(key), val, valLen, txn); GTEST_ASSERT_EQ(ret, 0); // commit current transaction - tdbCommit(pEnv, &txn); - tdbTxnClose(&txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); } // TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) { @@ -258,11 +256,9 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) { SPoolMem *pPool = openPool(); // start a transaction - TXN txn; - int64_t txnid = 0; - ++txnid; - tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pEnv, &txn); + TXN *txn; + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); // generate value payload // char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) @@ -271,7 +267,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) { generateBigVal(val, valLen); { // insert the generated big data - ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn); + ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn); GTEST_ASSERT_EQ(ret, 0); } @@ -297,7 +293,7 @@ tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_REA tdbBegin(pEnv, &txn); */ { // upsert the data - ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), &txn); + ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), txn); GTEST_ASSERT_EQ(ret, 0); } @@ -316,7 +312,7 @@ tdbBegin(pEnv, &txn); } { // delete the data - ret = tdbTbDelete(pDb, "key1", strlen("key1"), &txn); + ret = tdbTbDelete(pDb, "key1", strlen("key1"), txn); GTEST_ASSERT_EQ(ret, 0); } @@ -335,8 +331,8 @@ tdbBegin(pEnv, &txn); } // commit current transaction - tdbCommit(pEnv, &txn); - tdbTxnClose(&txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); } // TEST(tdb_test, DISABLED_simple_insert1) { @@ -346,7 +342,7 @@ TEST(tdb_test, simple_insert1) { TTB *pDb; tdb_cmpr_fn_t compFunc; int nData = 1; - TXN txn; + TXN *txn; int const pageSize = 4096; taosRemoveDir("tdb"); @@ -365,16 +361,13 @@ TEST(tdb_test, simple_insert1) { // char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) int64_t poolLimit = 4096; // 1M pool limit - int64_t txnid = 0; SPoolMem *pPool; // open the pool pPool = openPool(); // start a transaction - txnid++; - tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pEnv, &txn); + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); for (int iData = 1; iData <= nData; iData++) { sprintf(key, "key0"); @@ -393,26 +386,25 @@ TEST(tdb_test, simple_insert1) { val[i] = c; } - ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn); + ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn); GTEST_ASSERT_EQ(ret, 0); // if pool is full, commit the transaction and start a new one if (pPool->size >= poolLimit) { // commit current transaction - tdbCommit(pEnv, &txn); - tdbTxnClose(&txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); // start a new transaction clearPool(pPool); - txnid++; - tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pEnv, &txn); + + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); } } // commit the transaction - tdbCommit(pEnv, &txn); - tdbTxnClose(&txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); { // Query the data void *pVal = NULL; diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index f3a301cf5b..54e80a0009 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -125,7 +125,7 @@ TEST(tdb_test, DISABLED_simple_insert1) { TTB *pDb; tdb_cmpr_fn_t compFunc; int nData = 1000000; - TXN txn; + TXN *txn; taosRemoveDir("tdb"); @@ -142,40 +142,35 @@ TEST(tdb_test, DISABLED_simple_insert1) { char key[64]; char val[64]; int64_t poolLimit = 4096; // 1M pool limit - int64_t txnid = 0; SPoolMem *pPool; // open the pool pPool = openPool(); // start a transaction - txnid++; - tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pEnv, &txn); + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); for (int iData = 1; iData <= nData; iData++) { sprintf(key, "key%d", iData); sprintf(val, "value%d", iData); - ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); + ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn); GTEST_ASSERT_EQ(ret, 0); // if pool is full, commit the transaction and start a new one if (pPool->size >= poolLimit) { // commit current transaction - tdbCommit(pEnv, &txn); - tdbTxnClose(&txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); // start a new transaction clearPool(pPool); - txnid++; - tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pEnv, &txn); + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); } } // commit the transaction - tdbCommit(pEnv, &txn); - tdbTxnClose(&txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); { // Query the data void *pVal = NULL; @@ -245,7 +240,7 @@ TEST(tdb_test, DISABLED_simple_insert2) { TTB *pDb; tdb_cmpr_fn_t compFunc; int nData = 1000000; - TXN txn; + TXN *txn; taosRemoveDir("tdb"); @@ -261,21 +256,18 @@ TEST(tdb_test, DISABLED_simple_insert2) { { char key[64]; char val[64]; - int64_t txnid = 0; SPoolMem *pPool; // open the pool pPool = openPool(); // start a transaction - txnid++; - tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pEnv, &txn); + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); for (int iData = 1; iData <= nData; iData++) { sprintf(key, "key%d", iData); sprintf(val, "value%d", iData); - ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); + ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn); GTEST_ASSERT_EQ(ret, 0); } @@ -312,8 +304,8 @@ TEST(tdb_test, DISABLED_simple_insert2) { } // commit the transaction - tdbCommit(pEnv, &txn); - tdbTxnClose(&txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); ret = tdbTbDrop(pDb); GTEST_ASSERT_EQ(ret, 0); @@ -331,7 +323,7 @@ TEST(tdb_test, DISABLED_simple_delete1) { TTB *pDb; char key[128]; char data[128]; - TXN txn; + TXN *txn; TDB *pEnv; SPoolMem *pPool; void *pKey = NULL; @@ -353,14 +345,13 @@ TEST(tdb_test, DISABLED_simple_delete1) { ret = tdbTbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb, 0); GTEST_ASSERT_EQ(ret, 0); - tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pEnv, &txn); + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); // loop to insert batch data for (int iData = 0; iData < nKV; iData++) { sprintf(key, "key%d", iData); sprintf(data, "data%d", iData); - ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), &txn); + ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), txn); GTEST_ASSERT_EQ(ret, 0); } @@ -378,7 +369,7 @@ TEST(tdb_test, DISABLED_simple_delete1) { for (int iData = nKV - 1; iData > 30; iData--) { sprintf(key, "key%d", iData); - ret = tdbTbDelete(pDb, key, strlen(key), &txn); + ret = tdbTbDelete(pDb, key, strlen(key), txn); GTEST_ASSERT_EQ(ret, 0); } @@ -413,7 +404,8 @@ TEST(tdb_test, DISABLED_simple_delete1) { tdbTbcClose(pDbc); - tdbCommit(pEnv, &txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); closePool(pPool); @@ -430,7 +422,7 @@ TEST(tdb_test, DISABLED_simple_upsert1) { char data[64]; void *pData = NULL; SPoolMem *pPool; - TXN txn; + TXN *txn; taosRemoveDir("tdb"); @@ -444,13 +436,12 @@ TEST(tdb_test, DISABLED_simple_upsert1) { pPool = openPool(); // insert some data - tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - tdbBegin(pEnv, &txn); + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); for (int iData = 0; iData < nData; iData++) { sprintf(key, "key%d", iData); sprintf(data, "data%d", iData); - ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), &txn); + ret = tdbTbInsert(pDb, key, strlen(key), data, strlen(data), txn); GTEST_ASSERT_EQ(ret, 0); } @@ -467,11 +458,12 @@ TEST(tdb_test, DISABLED_simple_upsert1) { for (int iData = 0; iData < nData; iData++) { sprintf(key, "key%d", iData); sprintf(data, "data%d-u", iData); - ret = tdbTbUpsert(pDb, key, strlen(key), data, strlen(data), &txn); + ret = tdbTbUpsert(pDb, key, strlen(key), data, strlen(data), txn); GTEST_ASSERT_EQ(ret, 0); } - tdbCommit(pEnv, &txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); // query the data for (int iData = 0; iData < nData; iData++) { @@ -492,7 +484,7 @@ TEST(tdb_test, multi_thread_query) { TTB *pDb; tdb_cmpr_fn_t compFunc; int nData = 1000000; - TXN txn; + TXN *txn; taosRemoveDir("tdb"); @@ -508,26 +500,18 @@ TEST(tdb_test, multi_thread_query) { char key[64]; char val[64]; int64_t poolLimit = 4096 * 20; // 1M pool limit - int64_t txnid = 0; SPoolMem *pPool; // open the pool pPool = openPool(); // start a transaction - txnid++; - txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED; - txn.txnId = -1; - txn.xMalloc = poolMalloc; - txn.xFree = poolFree; - txn.xArg = pPool; - // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, ); - tdbBegin(pEnv, &txn); + tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); for (int iData = 1; iData <= nData; iData++) { sprintf(key, "key%d", iData); sprintf(val, "value%d", iData); - ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); + ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn); GTEST_ASSERT_EQ(ret, 0); } @@ -578,7 +562,7 @@ TEST(tdb_test, multi_thread_query) { std::vector threads; for (int i = 0; i < nThreads; i++) { if (i == 0) { - threads.push_back(std::thread(tdbCommit, pEnv, &txn)); + threads.push_back(std::thread(tdbCommit, pEnv, txn)); } else { threads.push_back(std::thread(f, pDb, nData)); } @@ -589,8 +573,8 @@ TEST(tdb_test, multi_thread_query) { } // commit the transaction - tdbCommit(pEnv, &txn); - tdbTxnClose(&txn); + tdbCommit(pEnv, txn); + tdbPostCommit(pEnv, txn); // Close a database tdbTbClose(pDb); @@ -621,17 +605,12 @@ TEST(tdb_test, DISABLED_multi_thread1) { GTEST_ASSERT_EQ(ret, 0); auto insert = [](TDB *pDb, TTB *pTb, int nData, int *stop, std::shared_timed_mutex *mu) { - TXN txn = {0}; + TXN *txn = NULL; char key[128]; char val[128]; SPoolMem *pPool = openPool(); - txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED; - txn.txnId = -1; - txn.xMalloc = poolMalloc; - txn.xFree = poolFree; - txn.xArg = pPool; - tdbBegin(pDb, &txn); + tdbBegin(pDb, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); for (int iData = 1; iData <= nData; iData++) { sprintf(key, "key%d", iData); sprintf(val, "value%d", iData); @@ -644,14 +623,17 @@ TEST(tdb_test, DISABLED_multi_thread1) { } if (pPool->size > 1024 * 1024) { - tdbCommit(pDb, &txn); + tdbCommit(pDb, txn); + tdbPostCommit(pDb, txn); clearPool(pPool); - tdbBegin(pDb, &txn); + tdbBegin(pDb, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); } } - tdbCommit(pDb, &txn); + tdbCommit(pDb, txn); + tdbPostCommit(pDb, txn); + closePool(pPool); *stop = 1; From 2c771153ef4a911ba36cfb69c8513e9bb41fecfe Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 28 Nov 2022 16:27:20 +0800 Subject: [PATCH 03/10] fix(streamState): use new txn related api --- source/libs/stream/src/streamState.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 5ed06e5eea..af1d738de0 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -192,9 +192,9 @@ void streamStateClose(SStreamState* pState) { } int32_t streamStateBegin(SStreamState* pState) { - if (tdbBegin(pState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - tdbAbort(pState->db, pState->txn); + tdbAbort(pState->pTdbState->db, pState->pTdbState->txn); return -1; } return 0; @@ -220,7 +220,7 @@ int32_t streamStateAbort(SStreamState* pState) { return -1; } - if (tdbBegin(pState->pTdbState->db, &pState->ptdbstate->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } @@ -823,7 +823,7 @@ _end: int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { tdbTbUpsert(pState->pTdbState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, - &pState->pTdbState->txn); + pState->pTdbState->txn); return 0; } From 7a7d7f7d00be0c0d44437e00a52deed2253b16f4 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 28 Nov 2022 17:31:11 +0800 Subject: [PATCH 04/10] tdb/write: new tdbPWriteFile api for commit and prep async to maindb --- include/os/osFile.h | 1 + source/libs/tdb/src/inc/tdbOs.h | 1 + source/os/src/osFile.c | 22 ++++++++++++++++++++++ 3 files changed, 24 insertions(+) diff --git a/include/os/osFile.h b/include/os/osFile.h index f6759d19a7..ae77e0f27a 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -88,6 +88,7 @@ int32_t taosFsyncFile(TdFilePtr pFile); int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count); int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset); int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count); +int64_t taosPWriteFile(TdFilePtr pFile, const void *buf, int64_t count, int64_t offset); void taosFprintfFile(TdFilePtr pFile, const char *format, ...); int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf); diff --git a/source/libs/tdb/src/inc/tdbOs.h b/source/libs/tdb/src/inc/tdbOs.h index b5dd27052c..3419034dd1 100644 --- a/source/libs/tdb/src/inc/tdbOs.h +++ b/source/libs/tdb/src/inc/tdbOs.h @@ -52,6 +52,7 @@ typedef TdFilePtr tdb_fd_t; #define tdbOsRead taosReadFile #define tdbOsPRead taosPReadFile #define tdbOsWrite taosWriteFile +#define tdbOsPWrite taosPWriteFile #define tdbOsFSync taosFsyncFile #define tdbOsLSeek taosLSeekFile #define tdbOsRemove remove diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index c3283ffe84..68365be481 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -491,6 +491,28 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { return count; } +int64_t taosPWriteFile(TdFilePtr pFile, const void *buf, int64_t count, int64_t offset) { + if (pFile == NULL) { + return 0; + } +#if FILE_WITH_LOCK + taosThreadRwlockWrlock(&(pFile->rwlock)); +#endif + assert(pFile->fd >= 0); // Please check if you have closed the file. +#ifdef WINDOWS + size_t pos = _lseeki64(pFile->fd, 0, SEEK_CUR); + _lseeki64(pFile->fd, offset, SEEK_SET); + int64_t ret = _write(pFile->fd, buf, count); + _lseeki64(pFile->fd, pos, SEEK_SET); +#else + int64_t ret = pwrite(pFile->fd, buf, count, offset); +#endif +#if FILE_WITH_LOCK + taosThreadRwlockUnlock(&(pFile->rwlock)); +#endif + return ret; +} + int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { #if FILE_WITH_LOCK taosThreadRwlockRdlock(&(pFile->rwlock)); From 50318f6f9663ea786330c72ed0c9d4a776ea6da3 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 28 Nov 2022 17:33:11 +0800 Subject: [PATCH 05/10] tdb/pager: use tdb pwrite page to db instead of write --- source/libs/tdb/src/db/tdbPager.c | 33 ++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 35992f9b55..71df83d7a9 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -142,7 +142,7 @@ int hashset_contains(hashset_t set, void *item) { static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg, u8 loadPage); static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage); -static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage); +static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage); static FORCE_INLINE int32_t pageCmpFn(const SRBTreeNode *lhs, const SRBTreeNode *rhs) { SPage *pPageL = (SPage *)(((uint8_t *)lhs) - offsetof(SPage, node)); @@ -326,7 +326,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { pPage = (SPage *)pNode; ASSERT(pPage->nOverflow == 0); - ret = tdbPagerWritePageToDB(pPager, pPage); + ret = tdbPagerPWritePageToDB(pPager, pPage); if (ret < 0) { tdbError("failed to write page to db since %s", tstrerror(terrno)); return -1; @@ -403,7 +403,7 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { while ((pNode = tRBTreeIterNext(&iter)) != NULL) { pPage = (SPage *)pNode; if (pPage->isLocal) continue; - ret = tdbPagerWritePageToDB(pPager, pPage); + ret = tdbPagerPWritePageToDB(pPager, pPage); if (ret < 0) { tdbError("failed to write page to db since %s", tstrerror(terrno)); return -1; @@ -553,7 +553,7 @@ int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) { if (pgno > maxPgno) { maxPgno = pgno; } - ret = tdbPagerWritePageToDB(pPager, pPage); + ret = tdbPagerPWritePageToDB(pPager, pPage); if (ret < 0) { tdbError("failed to write page to db since %s", tstrerror(terrno)); return -1; @@ -769,13 +769,6 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) { return 0; } /* -struct TdFile { - TdThreadRwlock rwlock; - int refId; - int fd; - FILE *fp; -} TdFile; -*/ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) { i64 offset; int ret; @@ -795,7 +788,23 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) { return -1; } - // pwrite(pPager->fd->fd, pPage->pData, pPage->pageSize, offset); + return 0; +} +*/ +static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage) { + i64 offset; + int ret; + + offset = (i64)pPage->pageSize * (TDB_PAGE_PGNO(pPage) - 1); + + ret = tdbOsPWrite(pPager->fd, pPage->pData, pPage->pageSize, offset); + if (ret < 0) { + tdbError("failed to pwrite page data due to %s. file:%s, pageSize:%d", strerror(errno), pPager->dbFileName, + pPage->pageSize); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + return 0; } From a780305e1024e84a2ff31de0b665fe12f4b1e8df Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 28 Nov 2022 17:36:33 +0800 Subject: [PATCH 06/10] tdb/begin: use txn id in mem --- source/libs/tdb/src/db/tdbDb.c | 2 +- source/libs/tdb/src/inc/tdbInt.h | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index bda06aed39..bccea07269 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -103,7 +103,7 @@ int32_t tdbBegin(TDB *pDb, TXN **ppTxn, void *(*xMalloc)(void *, size_t), void ( int flags) { SPager *pPager; int ret; - int64_t txnId = 1; + int64_t txnId = ++pDb->txnId; TXN *pTxn = tdbOsCalloc(1, sizeof(*pTxn)); if (!pTxn) { diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 8e04383b95..6578041225 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -382,6 +382,7 @@ struct STDB { #ifdef USE_MAINDB TTB *pMainDb; #endif + int64_t txnId; }; typedef struct hashset_st *hashset_t; From e090cd9dd9ffac6dfe915b180a5daccf0d9a9618 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 29 Nov 2022 10:18:12 +0800 Subject: [PATCH 07/10] tdb/pager: move journal to txn --- source/libs/tdb/inc/tdb.h | 8 +- source/libs/tdb/src/db/tdbPage.c | 5 +- source/libs/tdb/src/db/tdbPager.c | 119 ++++++++++++++++++------------ source/libs/tdb/src/inc/tdbInt.h | 13 ++-- 4 files changed, 87 insertions(+), 58 deletions(-) diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 69c8c3b087..5f9de017f0 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -17,6 +17,7 @@ #define _TD_TDB_H_ #include "os.h" +#include "tdbOs.h" #ifdef __cplusplus extern "C" { @@ -78,12 +79,17 @@ int32_t tdbTxnClose(TXN *pTxn); // other void tdbFree(void *); +typedef struct hashset_st *hashset_t; + struct STxn { int flags; int64_t txnId; void *(*xMalloc)(void *, size_t); void (*xFree)(void *, void *); - void *xArg; + void *xArg; + tdb_fd_t jfd; + hashset_t jPageSet; + int preped; }; // error code diff --git a/source/libs/tdb/src/db/tdbPage.c b/source/libs/tdb/src/db/tdbPage.c index 016ad65cf3..0653b6fc36 100644 --- a/source/libs/tdb/src/db/tdbPage.c +++ b/source/libs/tdb/src/db/tdbPage.c @@ -69,14 +69,15 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t) *ppPage = pPage; - tdbDebug("page/create: %p %p", pPage, xMalloc); + tdbTrace("page/create: %p/%d %p", pPage, pPage->id, xMalloc); return 0; } int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) { u8 *ptr; - tdbDebug("page/destroy: %p %p", pPage, xFree); + tdbTrace("page/destroy: %p/%d %p", pPage, pPage->id, xFree); + ASSERT(!pPage->isDirty); ASSERT(xFree); for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) { diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 71df83d7a9..ebad93eb7f 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -219,9 +219,11 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) { int tdbPagerClose(SPager *pPager) { if (pPager) { + /* if (pPager->inTran) { tdbOsClose(pPager->jfd); } + */ tdbOsClose(pPager->fd); tdbOsFree(pPager); } @@ -232,16 +234,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { int ret; SPage **ppPage; - ASSERT(pPager->inTran); -#if 0 - if (pPager->inTran == 0) { - ret = tdbPagerBegin(pPager); - if (ret < 0) { - return -1; - } - } -#endif - + // ASSERT(pPager->inTran); if (pPage->isDirty) return 0; // ref page one more time so the page will not be release @@ -271,15 +264,16 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { // Write page to journal if neccessary if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize && - (pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))))) { + (pPager->pActiveTxn->jPageSet == NULL || + !hashset_contains(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))))) { ret = tdbPagerWritePageToJournal(pPager, pPage); if (ret < 0) { tdbError("failed to write page to journal since %s", tstrerror(terrno)); return -1; } - if (pPager->jPageSet) { - hashset_add(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); + if (pPager->pActiveTxn->jPageSet) { + hashset_add(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); } } @@ -287,23 +281,28 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { } int tdbPagerBegin(SPager *pPager, TXN *pTxn) { + /* if (pPager->inTran) { return 0; } - + */ // Open the journal - pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755); - if (TDB_FD_INVALID(pPager->jfd)) { + char jTxnFileName[TDB_FILENAME_LEN]; + sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId); + pTxn->jfd = tdbOsOpen(jTxnFileName, TDB_O_CREAT | TDB_O_RDWR, 0755); + if (TDB_FD_INVALID(pTxn->jfd)) { tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - pPager->jPageSet = hashset_create(); + pTxn->jPageSet = hashset_create(); + ASSERT(pPager->pActiveTxn->preped == 1); + pPager->pActiveTxn = pTxn; // TODO: write the size of the file - + /* pPager->inTran = 1; - + */ return 0; } @@ -312,9 +311,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { int ret; // sync the journal file - ret = tdbOsFSync(pPager->jfd); + ret = tdbOsFSync(pTxn->jfd); if (ret < 0) { - tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName); + tdbError("failed to fsync: %s. jFileName:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -344,8 +343,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { pPage->isDirty = 0; tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); - if (pPager->jPageSet) { - hashset_remove(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); + if (pTxn->jPageSet) { + hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); } tdbPCacheRelease(pPager->pCache, pPage, pTxn); } @@ -364,35 +363,39 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { } int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { + char jTxnFileName[TDB_FILENAME_LEN]; + sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId); + // remove the journal file - if (tdbOsClose(pPager->jfd) < 0) { - tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName); + if (tdbOsClose(pTxn->jfd) < 0) { + tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { - tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName); + if (tdbOsRemove(jTxnFileName) < 0 && errno != ENOENT) { + tdbError("failed to remove file due to %s. file:%s", strerror(errno), jTxnFileName); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - if (pPager->jPageSet) { - hashset_destroy(pPager->jPageSet); + if (pTxn->jPageSet) { + hashset_destroy(pTxn->jPageSet); } - pPager->inTran = 0; + // pPager->inTran = 0; return 0; } int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { SPage *pPage; + SPgno maxPgno = pPager->dbOrigSize; int ret; // sync the journal file - ret = tdbOsFSync(pPager->jfd); + ret = tdbOsFSync(pTxn->jfd); if (ret < 0) { - tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName); + tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -403,6 +406,11 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { while ((pNode = tRBTreeIterNext(&iter)) != NULL) { pPage = (SPage *)pNode; if (pPage->isLocal) continue; + + SPgno pgno = TDB_PAGE_PGNO(pPage); + if (pgno > maxPgno) { + maxPgno = pgno; + } ret = tdbPagerPWritePageToDB(pPager, pPage); if (ret < 0) { tdbError("failed to write page to db since %s", tstrerror(terrno)); @@ -411,7 +419,8 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { } tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize); - pPager->dbOrigSize = pPager->dbFileSize; + pPager->dbOrigSize = maxPgno; + // pPager->dbOrigSize = pPager->dbFileSize; // release the page iter = tRBTreeIterCreate(&pPager->rbt, 1); @@ -423,6 +432,8 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tdbPCacheRelease(pPager->pCache, pPage, pTxn); } + + pTxn->preped = 1; /* tdbTrace("reset dirty tree: %p", &pPager->rbt); tRBTreeCreate(&pPager->rbt, pageCmpFn); @@ -444,15 +455,15 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { SPgno journalSize = 0; int ret; - // 0, sync the journal file - ret = tdbOsFSync(pPager->jfd); + // sync the journal file + ret = tdbOsFSync(pTxn->jfd); if (ret < 0) { - tdbError("failed to fsync jfd due to %s. file:%s", strerror(errno), pPager->jFileName); + tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - tdb_fd_t jfd = pPager->jfd; + tdb_fd_t jfd = pTxn->jfd; ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize); if (ret < 0) { @@ -516,7 +527,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { pPage->isDirty = 0; tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); - hashset_remove(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); + hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); tdbPCacheRelease(pPager->pCache, pPage, pTxn); } @@ -524,11 +535,24 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { tRBTreeCreate(&pPager->rbt, pageCmpFn); // 4, remove the journal file - tdbOsClose(pPager->jfd); - (void)tdbOsRemove(pPager->jFileName); - hashset_destroy(pPager->jPageSet); + if (tdbOsClose(pTxn->jfd) < 0) { + tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } - pPager->inTran = 0; + char jTxnFileName[TDB_FILENAME_LEN]; + sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId); + + if (tdbOsRemove(jTxnFileName) < 0 && errno != ENOENT) { + tdbError("failed to remove file due to %s. file:%s", strerror(errno), jTxnFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + hashset_destroy(pTxn->jPageSet); + + // pPager->inTran = 0; return 0; } @@ -751,17 +775,18 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) { pgno = TDB_PAGE_PGNO(pPage); - ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno)); + ret = tdbOsWrite(pPager->pActiveTxn->jfd, &pgno, sizeof(pgno)); if (ret < 0) { - tdbError("failed to write pgno due to %s. file:%s, pgno:%u", strerror(errno), pPager->jFileName, pgno); + tdbError("failed to write pgno due to %s. file:%s, pgno:%u, txnId:%" PRId64, strerror(errno), pPager->jFileName, + pgno, pPager->pActiveTxn->txnId); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } - ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize); + ret = tdbOsWrite(pPager->pActiveTxn->jfd, pPage->pData, pPage->pageSize); if (ret < 0) { - tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName, - (long)pPage->pageSize); + tdbError("failed to write page data due to %s. file:%s, pageSize:%d, txnId:%" PRId64, strerror(errno), + pPager->jFileName, pPage->pageSize, pPager->pActiveTxn->txnId); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 6578041225..c46ab68689 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -385,24 +385,21 @@ struct STDB { int64_t txnId; }; -typedef struct hashset_st *hashset_t; - struct SPager { char *dbFileName; char *jFileName; int pageSize; uint8_t fid[TDB_FILE_ID_LEN]; tdb_fd_t fd; - tdb_fd_t jfd; SPCache *pCache; SPgno dbFileSize; SPgno dbOrigSize; // SPage *pDirty; - hashset_t jPageSet; - SRBTree rbt; - u8 inTran; - SPager *pNext; // used by TDB - SPager *pHashNext; // used by TDB + SRBTree rbt; + // u8 inTran; + TXN *pActiveTxn; + SPager *pNext; // used by TDB + SPager *pHashNext; // used by TDB #ifdef USE_MAINDB TDB *pEnv; #endif From 7902b7aaac10c75769a33e1c0245ffaccd698523 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 29 Nov 2022 10:46:04 +0800 Subject: [PATCH 08/10] fix/tdb: remove UAF preped --- source/libs/tdb/inc/tdb.h | 1 - source/libs/tdb/src/db/tdbPager.c | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 5f9de017f0..c728e29641 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -89,7 +89,6 @@ struct STxn { void *xArg; tdb_fd_t jfd; hashset_t jPageSet; - int preped; }; // error code diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index ebad93eb7f..082fec4902 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -297,7 +297,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) { } pTxn->jPageSet = hashset_create(); - ASSERT(pPager->pActiveTxn->preped == 1); + pPager->pActiveTxn = pTxn; // TODO: write the size of the file /* @@ -433,7 +433,6 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); } - pTxn->preped = 1; /* tdbTrace("reset dirty tree: %p", &pPager->rbt); tRBTreeCreate(&pPager->rbt, pageCmpFn); From 629c87685f535d6da44230c787fad67a70c48655 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 29 Nov 2022 11:04:47 +0800 Subject: [PATCH 09/10] fix/txnId: wrap around INT64_MAX --- source/libs/tdb/src/db/tdbDb.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index bccea07269..c79279c658 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -104,6 +104,9 @@ int32_t tdbBegin(TDB *pDb, TXN **ppTxn, void *(*xMalloc)(void *, size_t), void ( SPager *pPager; int ret; int64_t txnId = ++pDb->txnId; + if (txnId == INT64_MAX) { + pDb->txnId = 0; + } TXN *pTxn = tdbOsCalloc(1, sizeof(*pTxn)); if (!pTxn) { From 0b1cd9f824b1fad7054fe0799600594635b25c87 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 29 Nov 2022 16:00:51 +0800 Subject: [PATCH 10/10] tdb/journal: rollback & restore multiple journal files --- source/libs/tdb/src/db/tdbPager.c | 50 +++++++++++++++++++++++++++---- source/libs/tdb/src/db/tdbTable.c | 2 +- source/libs/tdb/src/inc/tdbInt.h | 2 +- source/libs/tdb/src/inc/tdbOs.h | 25 +++++++++------- 4 files changed, 61 insertions(+), 18 deletions(-) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 082fec4902..7f0fdf9efc 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -832,12 +832,12 @@ static int tdbPagerPWritePageToDB(SPager *pPager, SPage *pPage) { return 0; } -int tdbPagerRestore(SPager *pPager, SBTree *pBt) { +static int tdbPagerRestore(SPager *pPager, SBTree *pBt, const char *jFileName) { int ret = 0; SPgno journalSize = 0; u8 *pageBuf = NULL; - tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755); + tdb_fd_t jfd = tdbOsOpen(jFileName, TDB_O_RDWR, 0755); if (jfd == NULL) { return 0; } @@ -910,12 +910,50 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) { return 0; } -int tdbPagerRollback(SPager *pPager) { - if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { - tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), pPager->jFileName); - terrno = TAOS_SYSTEM_ERROR(errno); +int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt) { + tdbDirEntryPtr pDirEntry; + tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName); + if (pDir == NULL) { + tdbError("failed to open %s since %s", pPager->pEnv->dbName, strerror(errno)); return -1; } + while ((pDirEntry = tdbReadDir(pDir)) != NULL) { + char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry)); + if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) { + if (tdbPagerRestore(pPager, pBt, name) < 0) { + tdbError("failed to restore file due to %s. jFileName:%s", strerror(errno), name); + return -1; + } + } + } + + tdbCloseDir(&pDir); + + return 0; +} + +int tdbPagerRollback(SPager *pPager) { + tdbDirEntryPtr pDirEntry; + tdbDirPtr pDir = taosOpenDir(pPager->pEnv->dbName); + if (pDir == NULL) { + tdbError("failed to open %s since %s", pPager->pEnv->dbName, strerror(errno)); + return -1; + } + + while ((pDirEntry = tdbReadDir(pDir)) != NULL) { + char *name = tdbDirEntryBaseName(tdbGetDirEntryName(pDirEntry)); + + if (strncmp(TDB_MAINDB_NAME "-journal", name, 16) == 0) { + if (tdbOsRemove(name) < 0 && errno != ENOENT) { + tdbError("failed to remove file due to %s. jFileName:%s", strerror(errno), name); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } + } + + tdbCloseDir(&pDir); + return 0; } diff --git a/source/libs/tdb/src/db/tdbTable.c b/source/libs/tdb/src/db/tdbTable.c index 636c4fd997..c5c2d6aebe 100644 --- a/source/libs/tdb/src/db/tdbTable.c +++ b/source/libs/tdb/src/db/tdbTable.c @@ -117,7 +117,7 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF if (rollback) { tdbPagerRollback(pPager); } else { - ret = tdbPagerRestore(pPager, pTb->pBt); + ret = tdbPagerRestoreJournals(pPager, pTb->pBt); if (ret < 0) { tdbOsFree(pTb); return -1; diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index c46ab68689..055a8a1062 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -197,7 +197,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initP TXN *pTxn); void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn); int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno); -int tdbPagerRestore(SPager *pPager, SBTree *pBt); +int tdbPagerRestoreJournals(SPager *pPager, SBTree *pBt); int tdbPagerRollback(SPager *pPager); // tdbPCache.c ==================================== diff --git a/source/libs/tdb/src/inc/tdbOs.h b/source/libs/tdb/src/inc/tdbOs.h index 3419034dd1..db4283e267 100644 --- a/source/libs/tdb/src/inc/tdbOs.h +++ b/source/libs/tdb/src/inc/tdbOs.h @@ -47,16 +47,21 @@ typedef TdFilePtr tdb_fd_t; #define TDB_O_RDWR (TD_FILE_WRITE) | (TD_FILE_READ) #define tdbOsOpen(PATH, OPTION, MODE) taosOpenFile((PATH), (OPTION)) - -#define tdbOsClose(FD) taosCloseFile(&(FD)) -#define tdbOsRead taosReadFile -#define tdbOsPRead taosPReadFile -#define tdbOsWrite taosWriteFile -#define tdbOsPWrite taosPWriteFile -#define tdbOsFSync taosFsyncFile -#define tdbOsLSeek taosLSeekFile -#define tdbOsRemove remove -#define tdbOsFileSize(FD, PSIZE) taosFStatFile(FD, PSIZE, NULL) +#define tdbOsClose(FD) taosCloseFile(&(FD)) +#define tdbOsRead taosReadFile +#define tdbOsPRead taosPReadFile +#define tdbOsWrite taosWriteFile +#define tdbOsPWrite taosPWriteFile +#define tdbOsFSync taosFsyncFile +#define tdbOsLSeek taosLSeekFile +#define tdbDirPtr TdDirPtr +#define tdbDirEntryPtr TdDirEntryPtr +#define tdbReadDir taosReadDir +#define tdbGetDirEntryName taosGetDirEntryName +#define tdbDirEntryBaseName taosDirEntryBaseName +#define tdbCloseDir taosCloseDir +#define tdbOsRemove remove +#define tdbOsFileSize(FD, PSIZE) taosFStatFile(FD, PSIZE, NULL) /* directory */ #define tdbOsMkdir taosMkDir