Merge pull request #12606 from taosdata/fix/hzcheng_3.0

fix: concurrent read TDB
This commit is contained in:
Hongze Cheng 2022-05-17 22:12:49 +08:00 committed by GitHub
commit 72b8a3543d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 44 deletions

View File

@ -15,10 +15,10 @@
#include "tdbInt.h" #include "tdbInt.h"
struct SPCache { struct SPCache {
int pageSize; int szPage;
int cacheSize; int nPages;
SPage **aPage;
tdb_mutex_t mutex; tdb_mutex_t mutex;
SPage *pList;
int nFree; int nFree;
SPage *pFree; SPage *pFree;
int nPage; int nPage;
@ -52,13 +52,14 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
void *pPtr; void *pPtr;
SPage *pPgHdr; SPage *pPgHdr;
pCache = (SPCache *)tdbOsCalloc(1, sizeof(*pCache)); pCache = (SPCache *)tdbOsCalloc(1, sizeof(*pCache) + sizeof(SPage *) * cacheSize);
if (pCache == NULL) { if (pCache == NULL) {
return -1; return -1;
} }
pCache->pageSize = pageSize; pCache->szPage = pageSize;
pCache->cacheSize = cacheSize; pCache->nPages = cacheSize;
pCache->aPage = (SPage **)&pCache[1];
if (tdbPCacheOpenImpl(pCache) < 0) { if (tdbPCacheOpenImpl(pCache) < 0) {
tdbOsFree(pCache); tdbOsFree(pCache);
@ -84,7 +85,7 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
pPage = tdbPCacheFetchImpl(pCache, pPgid, pTxn); pPage = tdbPCacheFetchImpl(pCache, pPgid, pTxn);
if (pPage) { if (pPage) {
TDB_REF_PAGE(pPage); tdbRefPage(pPage);
} }
tdbPCacheUnlock(pCache); tdbPCacheUnlock(pCache);
@ -97,7 +98,7 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
ASSERT(pTxn); ASSERT(pTxn);
nRef = TDB_UNREF_PAGE(pPage); nRef = tdbUnrefPage(pPage);
ASSERT(nRef >= 0); ASSERT(nRef >= 0);
if (nRef == 0) { if (nRef == 0) {
@ -105,7 +106,7 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
// test the nRef again to make sure // test the nRef again to make sure
// it is safe th handle the page // it is safe th handle the page
nRef = TDB_GET_PAGE_REF(pPage); nRef = tdbGetPageRef(pPage);
if (nRef == 0) { if (nRef == 0) {
if (pPage->isLocal) { if (pPage->isLocal) {
tdbPCacheUnpinPage(pCache, pPage); tdbPCacheUnpinPage(pCache, pPage);
@ -123,7 +124,7 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
} }
} }
int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->pageSize; } int tdbPCacheGetPageSize(SPCache *pCache) { return pCache->szPage; }
static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
int ret = 0; int ret = 0;
@ -168,7 +169,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
// 4. Try a create new page // 4. Try a create new page
if (!pPage) { if (!pPage) {
ret = tdbPageCreate(pCache->pageSize, &pPage, pTxn->xMalloc, pTxn->xArg); ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg);
if (ret < 0) { if (ret < 0) {
// TODO // TODO
ASSERT(0); ASSERT(0);
@ -178,7 +179,8 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
// init the page fields // init the page fields
pPage->isAnchor = 0; pPage->isAnchor = 0;
pPage->isLocal = 0; pPage->isLocal = 0;
TDB_INIT_PAGE_REF(pPage); pPage->nRef = 0;
pPage->id = -1;
} }
// 5. Page here are just created from a free list // 5. Page here are just created from a free list
@ -212,20 +214,25 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
} }
static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) { static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
if (!PAGE_IS_PINNED(pPage)) { if (pPage->pLruNext != NULL) {
ASSERT(tdbGetPageRef(pPage) == 0);
pPage->pLruPrev->pLruNext = pPage->pLruNext; pPage->pLruPrev->pLruNext = pPage->pLruNext;
pPage->pLruNext->pLruPrev = pPage->pLruPrev; pPage->pLruNext->pLruPrev = pPage->pLruPrev;
pPage->pLruNext = NULL; pPage->pLruNext = NULL;
pCache->nRecyclable--; pCache->nRecyclable--;
tdbTrace("pin page %d", pPage->id);
} }
} }
static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) { static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
i32 nRef; i32 nRef;
ASSERT(pPage->isLocal);
ASSERT(!pPage->isDirty); ASSERT(!pPage->isDirty);
ASSERT(TDB_GET_PAGE_REF(pPage) == 0); ASSERT(tdbGetPageRef(pPage) == 0);
ASSERT(pPage->pLruNext == NULL); ASSERT(pPage->pLruNext == NULL);
@ -235,6 +242,8 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
pCache->lru.pLruNext = pPage; pCache->lru.pLruNext = pPage;
pCache->nRecyclable++; pCache->nRecyclable++;
tdbTrace("unpin page %d", pPage->id);
} }
static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) { static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
@ -248,6 +257,8 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
*ppPage = pPage->pHashNext; *ppPage = pPage->pHashNext;
pCache->nPage--; pCache->nPage--;
tdbTrace("remove page %d to hash", pPage->id);
} }
static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) { static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
@ -259,6 +270,8 @@ static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
pCache->pgHash[h] = pPage; pCache->pgHash[h] = pPage;
pCache->nPage++; pCache->nPage++;
tdbTrace("add page %d to hash", pPage->id);
} }
static int tdbPCacheOpenImpl(SPCache *pCache) { static int tdbPCacheOpenImpl(SPCache *pCache) {
@ -272,8 +285,8 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
// Open the free list // Open the free list
pCache->nFree = 0; pCache->nFree = 0;
pCache->pFree = NULL; pCache->pFree = NULL;
for (int i = 0; i < pCache->cacheSize; i++) { for (int i = 0; i < pCache->nPages; i++) {
ret = tdbPageCreate(pCache->pageSize, &pPage, tdbDefaultMalloc, NULL); ret = tdbPageCreate(pCache->szPage, &pPage, tdbDefaultMalloc, NULL);
if (ret < 0) { if (ret < 0) {
// TODO: handle error // TODO: handle error
return -1; return -1;
@ -282,7 +295,7 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
// pPage->pgid = 0; // pPage->pgid = 0;
pPage->isAnchor = 0; pPage->isAnchor = 0;
pPage->isLocal = 1; pPage->isLocal = 1;
TDB_INIT_PAGE_REF(pPage); pPage->nRef = 0;
pPage->pHashNext = NULL; pPage->pHashNext = NULL;
pPage->pLruNext = NULL; pPage->pLruNext = NULL;
pPage->pLruPrev = NULL; pPage->pLruPrev = NULL;
@ -294,13 +307,13 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
pCache->nFree++; pCache->nFree++;
// add to local list // add to local list
pPage->pCacheNext = pCache->pList; pPage->id = i;
pCache->pList = pPage; pCache->aPage[i] = pPage;
} }
// Open the hash table // Open the hash table
pCache->nPage = 0; pCache->nPage = 0;
pCache->nHash = pCache->cacheSize < 8 ? 8 : pCache->cacheSize; pCache->nHash = pCache->nPages < 8 ? 8 : pCache->nPages;
pCache->pgHash = (SPage **)tdbOsCalloc(pCache->nHash, sizeof(SPage *)); pCache->pgHash = (SPage **)tdbOsCalloc(pCache->nHash, sizeof(SPage *));
if (pCache->pgHash == NULL) { if (pCache->pgHash == NULL) {
// TODO // TODO
@ -317,11 +330,11 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
} }
static int tdbPCacheCloseImpl(SPCache *pCache) { static int tdbPCacheCloseImpl(SPCache *pCache) {
SPage *pPage; for (i32 iPage = 0; iPage < pCache->nPages; iPage++) {
if (pCache->aPage[iPage]) {
for (pPage = pCache->pList; pPage; pPage = pCache->pList) { tdbPageDestroy(pCache->aPage[iPage], tdbDefaultFree, NULL);
pCache->pList = pPage->pCacheNext; pCache->aPage[iPage] = NULL;
tdbPageDestroy(pPage, tdbDefaultFree, NULL); }
} }
tdbOsFree(pCache->pgHash); tdbOsFree(pCache->pgHash);

View File

@ -149,7 +149,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
if (pPage->isDirty) return 0; if (pPage->isDirty) return 0;
// ref page one more time so the page will not be release // ref page one more time so the page will not be release
TDB_REF_PAGE(pPage); tdbRefPage(pPage);
// Set page as dirty // Set page as dirty
pPage->isDirty = 1; pPage->isDirty = 1;

View File

@ -18,10 +18,23 @@
#include "tdb.h" #include "tdb.h"
#include "tlog.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
// clang-format off
extern int32_t tdbDebugFlag;
#define tdbFatal(...) do { if (tdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tdbError(...) do { if (tdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tdbWarn(...) do { if (tdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tdbInfo(...) do { if (tdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tdbDebug(...) do { if (tdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", DEBUG_DEBUG, tdbDebugFlag, __VA_ARGS__); }} while(0)
#define tdbTrace(...) do { if (tdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", DEBUG_TRACE, tdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef int8_t i8; typedef int8_t i8;
typedef int16_t i16; typedef int16_t i16;
typedef int32_t i32; typedef int32_t i32;
@ -165,8 +178,8 @@ int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
u8 isAnchor; \ u8 isAnchor; \
u8 isLocal; \ u8 isLocal; \
u8 isDirty; \ u8 isDirty; \
i32 nRef; \ volatile i32 nRef; \
SPage *pCacheNext; \ i32 id; \
SPage *pFreeNext; \ SPage *pFreeNext; \
SPage *pHashNext; \ SPage *pHashNext; \
SPage *pLruNext; \ SPage *pLruNext; \
@ -176,10 +189,6 @@ int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
SPgid pgid; SPgid pgid;
// For page ref // For page ref
#define TDB_INIT_PAGE_REF(pPage) ((pPage)->nRef = 0)
#define TDB_REF_PAGE(pPage) atomic_add_fetch_32(&((pPage)->nRef), 1)
#define TDB_UNREF_PAGE(pPage) atomic_sub_fetch_32(&((pPage)->nRef), 1)
#define TDB_GET_PAGE_REF(pPage) atomic_load_32(&((pPage)->nRef))
int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache); int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache);
int tdbPCacheClose(SPCache *pCache); int tdbPCacheClose(SPCache *pCache);
@ -246,6 +255,20 @@ struct SPage {
TDB_PCACHE_PAGE TDB_PCACHE_PAGE
}; };
static inline i32 tdbRefPage(SPage *pPage) {
i32 nRef = atomic_add_fetch_32(&((pPage)->nRef), 1);
tdbTrace("ref page %d, nRef %d", pPage->id, nRef);
return nRef;
}
static inline i32 tdbUnrefPage(SPage *pPage) {
i32 nRef = atomic_sub_fetch_32(&((pPage)->nRef), 1);
tdbTrace("unref page %d, nRef %d", pPage->id, nRef);
return nRef;
}
#define tdbGetPageRef(pPage) atomic_load_32(&((pPage)->nRef))
// For page lock // For page lock
#define P_LOCK_SUCC 0 #define P_LOCK_SUCC 0
#define P_LOCK_BUSY 1 #define P_LOCK_BUSY 1

View File

@ -14,8 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h" #include "tlog.h"
#include "os.h"
#include "tutil.h" #include "tutil.h"
#define LOG_MAX_LINE_SIZE (1024) #define LOG_MAX_LINE_SIZE (1024)
@ -90,6 +90,7 @@ int32_t qDebugFlag = 131;
int32_t wDebugFlag = 135; int32_t wDebugFlag = 135;
int32_t sDebugFlag = 135; int32_t sDebugFlag = 135;
int32_t tsdbDebugFlag = 131; int32_t tsdbDebugFlag = 131;
int32_t tdbDebugFlag = 131;
int32_t tqDebugFlag = 135; int32_t tqDebugFlag = 135;
int32_t fsDebugFlag = 135; int32_t fsDebugFlag = 135;
int32_t metaDebugFlag = 135; int32_t metaDebugFlag = 135;