From 3b4551cc269005934fd483d0c7b630084b10bf7a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 20 May 2022 07:07:51 +0000 Subject: [PATCH] fix: tdb concurrency --- source/libs/tdb/src/db/tdbPCache.c | 58 ++++++++++++++++++------------ source/libs/tdb/src/db/tdbPager.c | 13 ++++++- source/libs/tdb/src/inc/tdbInt.h | 14 ++++---- source/libs/tdb/test/tdbTest.cpp | 10 +++--- 4 files changed, 60 insertions(+), 35 deletions(-) diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index 22d7e8e5a4..5ba5d65815 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -14,6 +14,9 @@ */ #include "tdbInt.h" +#include +#include + struct SPCache { int szPage; int nPages; @@ -32,7 +35,6 @@ static inline uint32_t tdbPCachePageHash(const SPgid *pPgid) { uint32_t *t = (uint32_t *)((pPgid)->fileid); return (uint32_t)(t[0] + t[1] + t[2] + t[3] + t[4] + t[5] + (pPgid)->pgno); } -#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL) static int tdbPCacheOpenImpl(SPCache *pCache); static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn); @@ -80,16 +82,22 @@ int tdbPCacheClose(SPCache *pCache) { SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { SPage *pPage; + i32 nRef; tdbPCacheLock(pCache); pPage = tdbPCacheFetchImpl(pCache, pPgid, pTxn); if (pPage) { - tdbRefPage(pPage); + nRef = tdbRefPage(pPage); } + ASSERT(pPage); + tdbPCacheUnlock(pCache); + // printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id, + // TDB_PAGE_PGNO(pPage), pPage, nRef); + return pPage; } @@ -98,30 +106,31 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) { ASSERT(pTxn); + // nRef = tdbUnrefPage(pPage); + // ASSERT(nRef >= 0); + + tdbPCacheLock(pCache); nRef = tdbUnrefPage(pPage); - ASSERT(nRef >= 0); - if (nRef == 0) { - tdbPCacheLock(pCache); - // test the nRef again to make sure // it is safe th handle the page - nRef = tdbGetPageRef(pPage); - if (nRef == 0) { - if (pPage->isLocal) { - tdbPCacheUnpinPage(pCache, pPage); - } else { - if (TDB_TXN_IS_WRITE(pTxn)) { - // remove from hash - tdbPCacheRemovePageFromHash(pCache, pPage); - } - - tdbPageDestroy(pPage, pTxn->xFree, pTxn->xArg); + // nRef = tdbGetPageRef(pPage); + // if (nRef == 0) { + if (pPage->isLocal) { + tdbPCacheUnpinPage(pCache, pPage); + } else { + if (TDB_TXN_IS_WRITE(pTxn)) { + // remove from hash + tdbPCacheRemovePageFromHash(pCache, pPage); } - } - tdbPCacheUnlock(pCache); + tdbPageDestroy(pPage, pTxn->xFree, pTxn->xArg); + } + // } } + tdbPCacheUnlock(pCache); + // printf("thread %" PRId64 " relas page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id, + // TDB_PAGE_PGNO(pPage), pPage, nRef); } int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->szPage; } @@ -223,6 +232,7 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) { pCache->nRecyclable--; + // printf("pin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage); tdbTrace("pin page %d", pPage->id); } } @@ -243,6 +253,7 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) { pCache->nRecyclable++; + // printf("unpin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage); tdbTrace("unpin page %d", pPage->id); } @@ -253,10 +264,12 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) { h = tdbPCachePageHash(&(pPage->pgid)); for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext)) ; - ASSERT(*ppPage == pPage); - *ppPage = pPage->pHashNext; - pCache->nPage--; + if (*ppPage) { + *ppPage = pPage->pHashNext; + pCache->nPage--; + // printf("rmv page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage); + } tdbTrace("remove page %d to hash", pPage->id); } @@ -271,6 +284,7 @@ static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) { pCache->nPage++; + // printf("add page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage); tdbTrace("add page %d to hash", pPage->id); } diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 4024cfe745..a74bb54883 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -265,6 +265,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa pgid.pgno = pgno; pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn); if (pPage == NULL) { + ASSERT(0); return -1; } @@ -272,10 +273,14 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa if (!TDB_PAGE_INITIALIZED(pPage)) { ret = tdbPagerInitPage(pPager, pPage, initPage, arg, loadPage); if (ret < 0) { + ASSERT(0); return -1; } } + // printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id, + // TDB_PAGE_PGNO(pPage), pPage); + ASSERT(TDB_PAGE_INITIALIZED(pPage)); ASSERT(pPage->pPager == pPager); @@ -284,7 +289,11 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa return 0; } -void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) { tdbPCacheRelease(pPager->pCache, pPage, pTxn); } +void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) { + tdbPCacheRelease(pPager->pCache, pPage, pTxn); + // printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id, + // TDB_PAGE_PGNO(pPage), pPage); +} static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) { // TODO: Allocate a page from the free list @@ -352,6 +361,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage ret = (*initPage)(pPage, arg, init); if (ret < 0) { + ASSERT(0); TDB_UNLOCK_PAGE(pPage); return -1; } @@ -370,6 +380,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage } } } else { + ASSERT(0); return -1; } diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index 2c79f39bc0..9f0267da93 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -275,15 +275,15 @@ static inline i32 tdbUnrefPage(SPage *pPage) { #define P_LOCK_FAIL -1 static inline int tdbTryLockPage(tdb_spinlock_t *pLock) { - int ret; - if (tdbSpinlockTrylock(pLock) == 0) { - ret = P_LOCK_SUCC; - } else if (errno == EBUSY) { - ret = P_LOCK_BUSY; + int ret = tdbSpinlockTrylock(pLock); + if (ret == 0) { + return P_LOCK_SUCC; + } else if (ret == EBUSY) { + return P_LOCK_BUSY; } else { - ret = P_LOCK_FAIL; + ASSERT(0); + return P_LOCK_FAIL; } - return ret; } #define TDB_INIT_PAGE_LOCK(pPage) tdbSpinlockInit(&((pPage)->lock), 0) diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index 5a3b45cca5..6070052127 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -486,18 +486,18 @@ TEST(tdb_test, DISABLED_simple_upsert1) { tdbClose(pEnv); } -TEST(tdb_test, DISABLED_multi_thread_query) { +TEST(tdb_test, multi_thread_query) { int ret; TDB *pEnv; TTB *pDb; tdb_cmpr_fn_t compFunc; - int nData = 100000; + int nData = 1000000; TXN txn; taosRemoveDir("tdb"); // Open Env - ret = tdbOpen("tdb", 512, 1, &pEnv); + ret = tdbOpen("tdb", 4096, 10, &pEnv); GTEST_ASSERT_EQ(ret, 0); // Create a database @@ -507,7 +507,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) { char key[64]; char val[64]; - int64_t poolLimit = 4096; // 1M pool limit + int64_t poolLimit = 4096 * 20; // 1M pool limit int64_t txnid = 0; SPoolMem *pPool; @@ -600,7 +600,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) { GTEST_ASSERT_EQ(ret, 0); } -TEST(tdb_test, multi_thread1) { +TEST(tdb_test, DISABLED_multi_thread1) { #if 0 int ret; TDB *pDb;