fix: tdb concurrency
This commit is contained in:
parent
90dff224d8
commit
3b4551cc26
|
@ -14,6 +14,9 @@
|
||||||
*/
|
*/
|
||||||
#include "tdbInt.h"
|
#include "tdbInt.h"
|
||||||
|
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
struct SPCache {
|
struct SPCache {
|
||||||
int szPage;
|
int szPage;
|
||||||
int nPages;
|
int nPages;
|
||||||
|
@ -32,7 +35,6 @@ static inline uint32_t tdbPCachePageHash(const SPgid *pPgid) {
|
||||||
uint32_t *t = (uint32_t *)((pPgid)->fileid);
|
uint32_t *t = (uint32_t *)((pPgid)->fileid);
|
||||||
return (uint32_t)(t[0] + t[1] + t[2] + t[3] + t[4] + t[5] + (pPgid)->pgno);
|
return (uint32_t)(t[0] + t[1] + t[2] + t[3] + t[4] + t[5] + (pPgid)->pgno);
|
||||||
}
|
}
|
||||||
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
|
|
||||||
|
|
||||||
static int tdbPCacheOpenImpl(SPCache *pCache);
|
static int tdbPCacheOpenImpl(SPCache *pCache);
|
||||||
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
|
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn);
|
||||||
|
@ -80,16 +82,22 @@ int tdbPCacheClose(SPCache *pCache) {
|
||||||
|
|
||||||
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
|
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
|
||||||
SPage *pPage;
|
SPage *pPage;
|
||||||
|
i32 nRef;
|
||||||
|
|
||||||
tdbPCacheLock(pCache);
|
tdbPCacheLock(pCache);
|
||||||
|
|
||||||
pPage = tdbPCacheFetchImpl(pCache, pPgid, pTxn);
|
pPage = tdbPCacheFetchImpl(pCache, pPgid, pTxn);
|
||||||
if (pPage) {
|
if (pPage) {
|
||||||
tdbRefPage(pPage);
|
nRef = tdbRefPage(pPage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pPage);
|
||||||
|
|
||||||
tdbPCacheUnlock(pCache);
|
tdbPCacheUnlock(pCache);
|
||||||
|
|
||||||
|
// printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
|
||||||
|
// TDB_PAGE_PGNO(pPage), pPage, nRef);
|
||||||
|
|
||||||
return pPage;
|
return pPage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,30 +106,31 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
|
||||||
|
|
||||||
ASSERT(pTxn);
|
ASSERT(pTxn);
|
||||||
|
|
||||||
|
// nRef = tdbUnrefPage(pPage);
|
||||||
|
// ASSERT(nRef >= 0);
|
||||||
|
|
||||||
|
tdbPCacheLock(pCache);
|
||||||
nRef = tdbUnrefPage(pPage);
|
nRef = tdbUnrefPage(pPage);
|
||||||
ASSERT(nRef >= 0);
|
|
||||||
|
|
||||||
if (nRef == 0) {
|
if (nRef == 0) {
|
||||||
tdbPCacheLock(pCache);
|
|
||||||
|
|
||||||
// test the nRef again to make sure
|
// test the nRef again to make sure
|
||||||
// it is safe th handle the page
|
// it is safe th handle the page
|
||||||
nRef = tdbGetPageRef(pPage);
|
// nRef = tdbGetPageRef(pPage);
|
||||||
if (nRef == 0) {
|
// if (nRef == 0) {
|
||||||
if (pPage->isLocal) {
|
if (pPage->isLocal) {
|
||||||
tdbPCacheUnpinPage(pCache, pPage);
|
tdbPCacheUnpinPage(pCache, pPage);
|
||||||
} else {
|
} else {
|
||||||
if (TDB_TXN_IS_WRITE(pTxn)) {
|
if (TDB_TXN_IS_WRITE(pTxn)) {
|
||||||
// remove from hash
|
// remove from hash
|
||||||
tdbPCacheRemovePageFromHash(pCache, pPage);
|
tdbPCacheRemovePageFromHash(pCache, pPage);
|
||||||
}
|
|
||||||
|
|
||||||
tdbPageDestroy(pPage, pTxn->xFree, pTxn->xArg);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
tdbPCacheUnlock(pCache);
|
tdbPageDestroy(pPage, pTxn->xFree, pTxn->xArg);
|
||||||
|
}
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
tdbPCacheUnlock(pCache);
|
||||||
|
// printf("thread %" PRId64 " relas page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
|
||||||
|
// TDB_PAGE_PGNO(pPage), pPage, nRef);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->szPage; }
|
int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->szPage; }
|
||||||
|
@ -223,6 +232,7 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
|
||||||
|
|
||||||
pCache->nRecyclable--;
|
pCache->nRecyclable--;
|
||||||
|
|
||||||
|
// printf("pin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
|
||||||
tdbTrace("pin page %d", pPage->id);
|
tdbTrace("pin page %d", pPage->id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -243,6 +253,7 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
|
||||||
|
|
||||||
pCache->nRecyclable++;
|
pCache->nRecyclable++;
|
||||||
|
|
||||||
|
// printf("unpin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
|
||||||
tdbTrace("unpin page %d", pPage->id);
|
tdbTrace("unpin page %d", pPage->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,10 +264,12 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
|
||||||
h = tdbPCachePageHash(&(pPage->pgid));
|
h = tdbPCachePageHash(&(pPage->pgid));
|
||||||
for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
|
for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext))
|
||||||
;
|
;
|
||||||
ASSERT(*ppPage == pPage);
|
|
||||||
*ppPage = pPage->pHashNext;
|
|
||||||
|
|
||||||
pCache->nPage--;
|
if (*ppPage) {
|
||||||
|
*ppPage = pPage->pHashNext;
|
||||||
|
pCache->nPage--;
|
||||||
|
// printf("rmv page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
|
||||||
|
}
|
||||||
|
|
||||||
tdbTrace("remove page %d to hash", pPage->id);
|
tdbTrace("remove page %d to hash", pPage->id);
|
||||||
}
|
}
|
||||||
|
@ -271,6 +284,7 @@ static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
|
||||||
|
|
||||||
pCache->nPage++;
|
pCache->nPage++;
|
||||||
|
|
||||||
|
// printf("add page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
|
||||||
tdbTrace("add page %d to hash", pPage->id);
|
tdbTrace("add page %d to hash", pPage->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -265,6 +265,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
|
||||||
pgid.pgno = pgno;
|
pgid.pgno = pgno;
|
||||||
pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
|
pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn);
|
||||||
if (pPage == NULL) {
|
if (pPage == NULL) {
|
||||||
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -272,10 +273,14 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
|
||||||
if (!TDB_PAGE_INITIALIZED(pPage)) {
|
if (!TDB_PAGE_INITIALIZED(pPage)) {
|
||||||
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
|
ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
|
||||||
|
// TDB_PAGE_PGNO(pPage), pPage);
|
||||||
|
|
||||||
ASSERT(TDB_PAGE_INITIALIZED(pPage));
|
ASSERT(TDB_PAGE_INITIALIZED(pPage));
|
||||||
ASSERT(pPage->pPager == pPager);
|
ASSERT(pPage->pPager == pPager);
|
||||||
|
|
||||||
|
@ -284,7 +289,11 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); }
|
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) {
|
||||||
|
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
|
||||||
|
// printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
|
||||||
|
// TDB_PAGE_PGNO(pPage), pPage);
|
||||||
|
}
|
||||||
|
|
||||||
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
|
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
|
||||||
// TODO: Allocate a page from the free list
|
// TODO: Allocate a page from the free list
|
||||||
|
@ -352,6 +361,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
|
||||||
|
|
||||||
ret = (*initPage)(pPage, arg, init);
|
ret = (*initPage)(pPage, arg, init);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
ASSERT(0);
|
||||||
TDB_UNLOCK_PAGE(pPage);
|
TDB_UNLOCK_PAGE(pPage);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -370,6 +380,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -275,15 +275,15 @@ static inline i32 tdbUnrefPage(SPage *pPage) {
|
||||||
#define P_LOCK_FAIL -1
|
#define P_LOCK_FAIL -1
|
||||||
|
|
||||||
static inline int tdbTryLockPage(tdb_spinlock_t *pLock) {
|
static inline int tdbTryLockPage(tdb_spinlock_t *pLock) {
|
||||||
int ret;
|
int ret = tdbSpinlockTrylock(pLock);
|
||||||
if (tdbSpinlockTrylock(pLock) == 0) {
|
if (ret == 0) {
|
||||||
ret = P_LOCK_SUCC;
|
return P_LOCK_SUCC;
|
||||||
} else if (errno == EBUSY) {
|
} else if (ret == EBUSY) {
|
||||||
ret = P_LOCK_BUSY;
|
return P_LOCK_BUSY;
|
||||||
} else {
|
} else {
|
||||||
ret = P_LOCK_FAIL;
|
ASSERT(0);
|
||||||
|
return P_LOCK_FAIL;
|
||||||
}
|
}
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#define TDB_INIT_PAGE_LOCK(pPage) tdbSpinlockInit(&((pPage)->lock), 0)
|
#define TDB_INIT_PAGE_LOCK(pPage) tdbSpinlockInit(&((pPage)->lock), 0)
|
||||||
|
|
|
@ -486,18 +486,18 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
|
||||||
tdbClose(pEnv);
|
tdbClose(pEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(tdb_test, DISABLED_multi_thread_query) {
|
TEST(tdb_test, multi_thread_query) {
|
||||||
int ret;
|
int ret;
|
||||||
TDB *pEnv;
|
TDB *pEnv;
|
||||||
TTB *pDb;
|
TTB *pDb;
|
||||||
tdb_cmpr_fn_t compFunc;
|
tdb_cmpr_fn_t compFunc;
|
||||||
int nData = 100000;
|
int nData = 1000000;
|
||||||
TXN txn;
|
TXN txn;
|
||||||
|
|
||||||
taosRemoveDir("tdb");
|
taosRemoveDir("tdb");
|
||||||
|
|
||||||
// Open Env
|
// Open Env
|
||||||
ret = tdbOpen("tdb", 512, 1, &pEnv);
|
ret = tdbOpen("tdb", 4096, 10, &pEnv);
|
||||||
GTEST_ASSERT_EQ(ret, 0);
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
|
|
||||||
// Create a database
|
// Create a database
|
||||||
|
@ -507,7 +507,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
|
||||||
|
|
||||||
char key[64];
|
char key[64];
|
||||||
char val[64];
|
char val[64];
|
||||||
int64_t poolLimit = 4096; // 1M pool limit
|
int64_t poolLimit = 4096 * 20; // 1M pool limit
|
||||||
int64_t txnid = 0;
|
int64_t txnid = 0;
|
||||||
SPoolMem *pPool;
|
SPoolMem *pPool;
|
||||||
|
|
||||||
|
@ -600,7 +600,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
|
||||||
GTEST_ASSERT_EQ(ret, 0);
|
GTEST_ASSERT_EQ(ret, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(tdb_test, multi_thread1) {
|
TEST(tdb_test, DISABLED_multi_thread1) {
|
||||||
#if 0
|
#if 0
|
||||||
int ret;
|
int ret;
|
||||||
TDB *pDb;
|
TDB *pDb;
|
||||||
|
|
Loading…
Reference in New Issue