fix: tdb concurrent w/r
This commit is contained in:
parent
f3464aa079
commit
3c4b91a796
|
@ -214,6 +214,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pPager->dbOrigSize = pPager->dbFileSize;
|
||||||
|
|
||||||
// release the page
|
// release the page
|
||||||
for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
|
for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
|
||||||
pPager->pDirty = pPage->pDirtyNext;
|
pPager->pDirty = pPage->pDirtyNext;
|
||||||
|
@ -230,7 +232,6 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
|
||||||
// remote the journal file
|
// remote the journal file
|
||||||
tdbOsClose(pPager->jfd);
|
tdbOsClose(pPager->jfd);
|
||||||
tdbOsRemove(pPager->jFileName);
|
tdbOsRemove(pPager->jFileName);
|
||||||
pPager->dbOrigSize = pPager->dbFileSize;
|
|
||||||
pPager->inTran = 0;
|
pPager->inTran = 0;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -1,9 +1,12 @@
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#define ALLOW_FORBID_FUNC
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tdb.h"
|
#include "tdb.h"
|
||||||
|
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
typedef struct SPoolMem {
|
typedef struct SPoolMem {
|
||||||
int64_t size;
|
int64_t size;
|
||||||
|
@ -480,4 +483,128 @@ TEST(tdb_test, simple_upsert1) {
|
||||||
|
|
||||||
tdbTbClose(pDb);
|
tdbTbClose(pDb);
|
||||||
tdbClose(pEnv);
|
tdbClose(pEnv);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(tdb_test, multi_thread_query) {
|
||||||
|
int ret;
|
||||||
|
TDB *pEnv;
|
||||||
|
TTB *pDb;
|
||||||
|
tdb_cmpr_fn_t compFunc;
|
||||||
|
int nData = 20000;
|
||||||
|
TXN txn;
|
||||||
|
|
||||||
|
taosRemoveDir("tdb");
|
||||||
|
|
||||||
|
// Open Env
|
||||||
|
ret = tdbOpen("tdb", 512, 1, &pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// Create a database
|
||||||
|
compFunc = tKeyCmpr;
|
||||||
|
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
char key[64];
|
||||||
|
char val[64];
|
||||||
|
int64_t poolLimit = 4096; // 1M pool limit
|
||||||
|
int64_t txnid = 0;
|
||||||
|
SPoolMem *pPool;
|
||||||
|
|
||||||
|
// open the pool
|
||||||
|
pPool = openPool();
|
||||||
|
|
||||||
|
// start a transaction
|
||||||
|
txnid++;
|
||||||
|
txn = {.flags = TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED,
|
||||||
|
.txnId = -1,
|
||||||
|
.xMalloc = poolMalloc,
|
||||||
|
.xFree = poolFree,
|
||||||
|
.xArg = pPool};
|
||||||
|
// tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, );
|
||||||
|
tdbBegin(pEnv, &txn);
|
||||||
|
|
||||||
|
for (int iData = 1; iData <= nData; iData++) {
|
||||||
|
sprintf(key, "key%d", iData);
|
||||||
|
sprintf(val, "value%d", iData);
|
||||||
|
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
// if pool is full, commit the transaction and start a new one
|
||||||
|
// if (pPool->size >= poolLimit) {
|
||||||
|
// break;
|
||||||
|
// // commit current transaction
|
||||||
|
// tdbCommit(pEnv, &txn);
|
||||||
|
// tdbTxnClose(&txn);
|
||||||
|
|
||||||
|
// // start a new transaction
|
||||||
|
// clearPool(pPool);
|
||||||
|
// txnid++;
|
||||||
|
// tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
// tdbBegin(pEnv, &txn);
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
auto f = [](TTB *pDb, int nData) {
|
||||||
|
TBC *pDBC;
|
||||||
|
void *pKey = NULL;
|
||||||
|
void *pVal = NULL;
|
||||||
|
int vLen, kLen;
|
||||||
|
int count = 0;
|
||||||
|
int ret;
|
||||||
|
TXN txn;
|
||||||
|
|
||||||
|
SPoolMem *pPool = openPool();
|
||||||
|
txn = {.flags = 0, .txnId = 0, .xMalloc = poolMalloc, .xFree = poolFree, .xArg = pPool};
|
||||||
|
|
||||||
|
ret = tdbTbcOpen(pDb, &pDBC, &txn);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
|
tdbTbcMoveToFirst(pDBC);
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
|
||||||
|
if (ret < 0) break;
|
||||||
|
|
||||||
|
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
|
||||||
|
// std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
|
||||||
|
// std::cout << std::endl;
|
||||||
|
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
GTEST_ASSERT_EQ(count, nData);
|
||||||
|
|
||||||
|
tdbTbcClose(pDBC);
|
||||||
|
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
|
};
|
||||||
|
|
||||||
|
// tdbCommit(pEnv, &txn);
|
||||||
|
|
||||||
|
// multi-thread query
|
||||||
|
int nThreads = 20;
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
for (int i = 0; i < nThreads; i++) {
|
||||||
|
if (i == 0) {
|
||||||
|
threads.push_back(std::thread(tdbCommit, pEnv, &txn));
|
||||||
|
} else {
|
||||||
|
threads.push_back(std::thread(f, pDb, nData));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto &th : threads) {
|
||||||
|
th.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit the transaction
|
||||||
|
tdbCommit(pEnv, &txn);
|
||||||
|
tdbTxnClose(&txn);
|
||||||
|
|
||||||
|
// Close a database
|
||||||
|
tdbTbClose(pDb);
|
||||||
|
|
||||||
|
// Close Env
|
||||||
|
ret = tdbClose(pEnv);
|
||||||
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
}
|
}
|
Loading…
Reference in New Issue