From 3c4b91a796fd14019e568a791bdd0831ff7f33e9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 19 May 2022 07:15:00 +0000 Subject: [PATCH 1/3] fix: tdb concurrent w/r --- source/libs/tdb/src/db/tdbPager.c | 3 +- source/libs/tdb/test/tdbTest.cpp | 127 ++++++++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 1 deletion(-) diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 6b5a3af347..4024cfe745 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -214,6 +214,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { } } + pPager->dbOrigSize = pPager->dbFileSize; + // release the page for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) { pPager->pDirty = pPage->pDirtyNext; @@ -230,7 +232,6 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { // remote the journal file tdbOsClose(pPager->jfd); tdbOsRemove(pPager->jFileName); - pPager->dbOrigSize = pPager->dbFileSize; pPager->inTran = 0; return 0; diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index a161f1c589..24ef8c0c83 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -1,9 +1,12 @@ #include +#define ALLOW_FORBID_FUNC #include "os.h" #include "tdb.h" #include +#include +#include typedef struct SPoolMem { int64_t size; @@ -480,4 +483,128 @@ TEST(tdb_test, simple_upsert1) { tdbTbClose(pDb); tdbClose(pEnv); +} + +TEST(tdb_test, multi_thread_query) { + int ret; + TDB *pEnv; + TTB *pDb; + tdb_cmpr_fn_t compFunc; + int nData = 20000; + TXN txn; + + taosRemoveDir("tdb"); + + // Open Env + ret = tdbOpen("tdb", 512, 1, &pEnv); + GTEST_ASSERT_EQ(ret, 0); + + // Create a database + compFunc = tKeyCmpr; + ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + GTEST_ASSERT_EQ(ret, 0); + + char key[64]; + char val[64]; + int64_t poolLimit = 4096; // 1M pool limit + int64_t txnid = 0; + SPoolMem *pPool; + + // open the pool + pPool = openPool(); + + // start a transaction + txnid++; + txn = {.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED, + .txnId = -1, + .xMalloc = poolMalloc, + .xFree = poolFree, + .xArg = pPool}; + // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, ); + tdbBegin(pEnv, &txn); + + for (int iData = 1; iData <= nData; iData++) { + sprintf(key, "key%d", iData); + sprintf(val, "value%d", iData); + ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); + GTEST_ASSERT_EQ(ret, 0); + + // if pool is full, commit the transaction and start a new one + // if (pPool->size >= poolLimit) { + // break; + // // commit current transaction + // tdbCommit(pEnv, &txn); + // tdbTxnClose(&txn); + + // // start a new transaction + // clearPool(pPool); + // txnid++; + // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + // tdbBegin(pEnv, &txn); + // } + } + + auto f = [](TTB *pDb, int nData) { + TBC *pDBC; + void *pKey = NULL; + void *pVal = NULL; + int vLen, kLen; + int count = 0; + int ret; + TXN txn; + + SPoolMem *pPool = openPool(); + txn = {.flags = 0, .txnId = 0, .xMalloc = poolMalloc, .xFree = poolFree, .xArg = pPool}; + + ret = tdbTbcOpen(pDb, &pDBC, &txn); + GTEST_ASSERT_EQ(ret, 0); + + tdbTbcMoveToFirst(pDBC); + + for (;;) { + ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen); + if (ret < 0) break; + + // std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " "; + // std::cout.write((char *)pVal, vLen) /* << " " << vLen */; + // std::cout << std::endl; + + count++; + } + + GTEST_ASSERT_EQ(count, nData); + + tdbTbcClose(pDBC); + + tdbFree(pKey); + tdbFree(pVal); + }; + + // tdbCommit(pEnv, &txn); + + // multi-thread query + int nThreads = 20; + std::vector threads; + for (int i = 0; i < nThreads; i++) { + if (i == 0) { + threads.push_back(std::thread(tdbCommit, pEnv, &txn)); + } else { + threads.push_back(std::thread(f, pDb, nData)); + } + } + + for (auto &th : threads) { + th.join(); + } + + // commit the transaction + tdbCommit(pEnv, &txn); + tdbTxnClose(&txn); + + // Close a database + tdbTbClose(pDb); + + // Close Env + ret = tdbClose(pEnv); + GTEST_ASSERT_EQ(ret, 0); } \ No newline at end of file From f30e6be9e035b5860109f4425e545a3c60414ae5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 19 May 2022 07:46:36 +0000 Subject: [PATCH 2/3] fix windows compile error --- source/libs/tdb/test/tdbTest.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index 24ef8c0c83..bbeeaa3faf 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -515,11 +515,11 @@ TEST(tdb_test, multi_thread_query) { // start a transaction txnid++; - txn = {.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED, - .txnId = -1, - .xMalloc = poolMalloc, - .xFree = poolFree, - .xArg = pPool}; + txn = (TXN){.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED, + .txnId = -1, + .xMalloc = poolMalloc, + .xFree = poolFree, + .xArg = pPool}; // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, ); tdbBegin(pEnv, &txn); @@ -554,7 +554,7 @@ TEST(tdb_test, multi_thread_query) { TXN txn; SPoolMem *pPool = openPool(); - txn = {.flags = 0, .txnId = 0, .xMalloc = poolMalloc, .xFree = poolFree, .xArg = pPool}; + txn = (TXN){.flags = 0, .txnId = 0, .xMalloc = poolMalloc, .xFree = poolFree, .xArg = pPool}; ret = tdbTbcOpen(pDb, &pDBC, &txn); GTEST_ASSERT_EQ(ret, 0); From c49b3f39d9a19fc4c9a19490ad4fa04528380e6c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 19 May 2022 08:16:06 +0000 Subject: [PATCH 3/3] make windows compile pass --- source/libs/tdb/test/tdbTest.cpp | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index bbeeaa3faf..fee3447884 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -515,11 +515,11 @@ TEST(tdb_test, multi_thread_query) { // start a transaction txnid++; - txn = (TXN){.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED, - .txnId = -1, - .xMalloc = poolMalloc, - .xFree = poolFree, - .xArg = pPool}; + txn.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED; + txn.txnId = -1; + txn.xMalloc = poolMalloc; + txn.xFree = poolFree; + txn.xArg = pPool; // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, ); tdbBegin(pEnv, &txn); @@ -528,20 +528,6 @@ TEST(tdb_test, multi_thread_query) { sprintf(val, "value%d", iData); ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); GTEST_ASSERT_EQ(ret, 0); - - // if pool is full, commit the transaction and start a new one - // if (pPool->size >= poolLimit) { - // break; - // // commit current transaction - // tdbCommit(pEnv, &txn); - // tdbTxnClose(&txn); - - // // start a new transaction - // clearPool(pPool); - // txnid++; - // tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); - // tdbBegin(pEnv, &txn); - // } } auto f = [](TTB *pDb, int nData) { @@ -554,7 +540,11 @@ TEST(tdb_test, multi_thread_query) { TXN txn; SPoolMem *pPool = openPool(); - txn = (TXN){.flags = 0, .txnId = 0, .xMalloc = poolMalloc, .xFree = poolFree, .xArg = pPool}; + txn.flags = 0; + txn.txnId = 0; + txn.xMalloc = poolMalloc; + txn.xFree = poolFree; + txn.xArg = pPool; ret = tdbTbcOpen(pDb, &pDBC, &txn); GTEST_ASSERT_EQ(ret, 0);