Merge pull request #21699 from taosdata/fix/TD-24717

enh(tdb/recycle): first round implemenation of page recycling
This commit is contained in:
wade zhang 2023-07-06 16:03:03 +08:00 committed by GitHub
commit 4fa61b9eae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1102 additions and 22 deletions

View File

@ -233,7 +233,11 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
int ret;
tdbBtcOpen(&btc, pBt, pTxn);
/*
btc.coder.ofps = taosArrayInit(8, sizeof(SPage *));
// btc.coder.ofps = taosArrayInit(8, sizeof(SPgno));
//pBtc->coder.ofps = taosArrayInit(8, sizeof(SPage *));
*/
tdbTrace("tdb delete, btc: %p, pTxn: %p", &btc, pTxn);
// move the cursor
@ -254,7 +258,18 @@ int tdbBtreeDelete(SBTree *pBt, const void *pKey, int kLen, TXN *pTxn) {
tdbBtcClose(&btc);
return -1;
}
/*
SArray *ofps = btc.coder.ofps;
if (ofps) {
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
tdbPagerInsertFreePage(btc.pBt->pPager, ofp, btc.pTxn);
}
taosArrayDestroy(ofps);
btc.coder.ofps = NULL;
}
*/
tdbBtcClose(&btc);
return 0;
}
@ -563,6 +578,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
}
// copy the parent key out if child pages are not leaf page
// childNotLeaf = !(TDB_BTREE_PAGE_IS_LEAF(pOlds[0]) || TDB_BTREE_PAGE_IS_OVFL(pOlds[0]));
childNotLeaf = !TDB_BTREE_PAGE_IS_LEAF(pOlds[0]);
if (childNotLeaf) {
for (int i = 0; i < nOlds; i++) {
@ -592,7 +608,30 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
for (int i = 0; i < nOlds; i++) {
nCells = TDB_PAGE_TOTAL_CELLS(pParent);
if (sIdx < nCells) {
bool destroyOfps = false;
if (!childNotLeaf) {
if (!pParent->pPager->ofps) {
pParent->pPager->ofps = taosArrayInit(8, sizeof(SPage *));
destroyOfps = true;
}
}
tdbPageDropCell(pParent, sIdx, pTxn, pBt);
if (!childNotLeaf) {
SArray *ofps = pParent->pPager->ofps;
if (ofps) {
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
tdbPagerInsertFreePage(pParent->pPager, ofp, pTxn);
}
if (destroyOfps) {
taosArrayDestroy(ofps);
pParent->pPager->ofps = NULL;
}
}
}
} else {
((SIntHdr *)pParent->pData)->pgno = 0;
}
@ -861,6 +900,8 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
if (!TDB_BTREE_PAGE_IS_LEAF(pNews[0])) {
((SIntHdr *)(pParent->pData))->pgno = ((SIntHdr *)(pNews[0]->pData))->pgno;
}
tdbPagerInsertFreePage(pBt->pPager, pNews[0], pTxn);
}
for (int i = 0; i < 3; i++) {
@ -870,6 +911,9 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
for (pageIdx = 0; pageIdx < nOlds; ++pageIdx) {
if (pageIdx >= nNews) {
tdbPagerInsertFreePage(pBt->pPager, pOlds[pageIdx], pTxn);
}
tdbPagerReturnPage(pBt->pPager, pOlds[pageIdx], pTxn);
}
for (; pageIdx < nNews; ++pageIdx) {
@ -1311,7 +1355,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
if (ret < 0) {
return -1;
}
/*
if (pDecoder->ofps) {
taosArrayPush(pDecoder->ofps, &ofp);
}
*/
ofpCell = tdbPageGetCell(ofp, 0);
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
@ -1346,11 +1394,17 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
int lastKeyPageSpace = 0;
// load left key & val to ovpages
while (pgno != 0) {
tdbTrace("tdb decode-ofp, pTxn: %p, pgno:%u by cell:%p", pTxn, pgno, pCell);
// printf("tdb decode-ofp, pTxn: %p, pgno:%u by cell:%p\n", pTxn, pgno, pCell);
ret = tdbLoadOvflPage(&pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
/*
if (pDecoder->ofps) {
taosArrayPush(pDecoder->ofps, &ofp);
}
*/
ofpCell = tdbPageGetCell(ofp, 0);
int lastKeyPage = 0;
@ -1518,8 +1572,8 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
if (pPage->vLen == TDB_VARIANT_LEN) {
if (!leaf) {
tdbError("tdb/btree-cell-size: not a leaf page.");
return -1;
tdbError("tdb/btree-cell-size: not a leaf page:%p, pgno:%" PRIu32 ".", pPage, TDB_PAGE_PGNO(pPage));
// return -1;
}
nHeader += tdbGetVarInt(pCell + nHeader, &vLen);
} else if (leaf) {
@ -1559,8 +1613,27 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
bytes = ofp->maxLocal - sizeof(SPgno);
}
// SPgno origPgno = pgno;
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
ret = tdbPagerWrite(pBt->pPager, ofp);
if (ret < 0) {
tdbError("failed to write page since %s", terrstr());
return -1;
}
/*
tdbPageDropCell(ofp, 0, pTxn, pBt);
*/
// SIntHdr *pIntHdr = (SIntHdr *)(ofp->pData);
// pIntHdr->flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
// pIntHdr->pgno = 0;
// ofp->pPager = NULL;
SArray *ofps = pPage->pPager->ofps;
if (ofps) {
taosArrayPush(ofps, &ofp);
}
tdbPagerReturnPage(pPage->pPager, ofp, pTxn);
nLeft -= bytes;
@ -1980,6 +2053,11 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
return -1;
}
if (TDB_BTREE_PAGE_IS_OVFL(pBtc->pPage)) {
tdbError("tdb/btc-move-downward: should not be a ovfl page here.");
return -1;
}
if (pBtc->idx < TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
pgno = ((SPgno *)pCell)[0];
@ -2068,8 +2146,27 @@ int tdbBtcDelete(SBTC *pBtc) {
return -1;
}
bool destroyOfps = false;
if (!pBtc->pPage->pPager->ofps) {
pBtc->pPage->pPager->ofps = taosArrayInit(8, sizeof(SPage *));
destroyOfps = true;
}
tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt);
SArray *ofps = pBtc->pPage->pPager->ofps;
if (ofps) {
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
tdbPagerInsertFreePage(pBtc->pPage->pPager, ofp, pBtc->pTxn);
}
if (destroyOfps) {
taosArrayDestroy(ofps);
pBtc->pPage->pPager->ofps = NULL;
}
}
// update interior page or do balance
if (idx == nCells - 1) {
if (idx) {
@ -2113,6 +2210,8 @@ int tdbBtcDelete(SBTC *pBtc) {
return -1;
}
// printf("tdb/btc-delete: btree balance delete pgno: %d.\n", TDB_PAGE_PGNO(pBtc->pPage));
ret = tdbBtreeBalance(pBtc);
if (ret < 0) {
tdbError("tdb/btc-delete: btree balance failed with ret: %d.", ret);
@ -2181,7 +2280,13 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
tdbError("tdb/btc-upsert: page insert/update cell failed with ret: %d.", ret);
return -1;
}
/*
bool destroyOfps = false;
if (!pBtc->pPage->pPager->ofps) {
pBtc->pPage->pPager->ofps = taosArrayInit(8, sizeof(SPage *));
destroyOfps = true;
}
*/
// check balance
if (pBtc->pPage->nOverflow > 0) {
ret = tdbBtreeBalance(pBtc);
@ -2190,7 +2295,20 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
return -1;
}
}
/*
SArray *ofps = pBtc->pPage->pPager->ofps;
if (ofps) {
for (int i = 0; i < TARRAY_SIZE(ofps); ++i) {
SPage *ofp = *(SPage **)taosArrayGet(ofps, i);
tdbPagerInsertFreePage(pBtc->pPage->pPager, ofp, pBtc->pTxn);
}
if (destroyOfps) {
taosArrayDestroy(ofps);
pBtc->pPage->pPager->ofps = NULL;
}
}
*/
return 0;
}

View File

@ -70,6 +70,11 @@ int32_t tdbOpen(const char *dbname, int32_t szPage, int32_t pages, TDB **ppDb, i
if (ret < 0) {
return -1;
}
ret = tdbTbOpen(TDB_FREEDB_NAME, sizeof(SPgno), 0, NULL, pDb, &pDb->pFreeDb, rollback);
if (ret < 0) {
return -1;
}
#endif
*ppDb = pDb;
@ -82,6 +87,7 @@ int tdbClose(TDB *pDb) {
if (pDb) {
#ifdef USE_MAINDB
if (pDb->pMainDb) tdbTbClose(pDb->pMainDb);
if (pDb->pFreeDb) tdbTbClose(pDb->pFreeDb);
#endif
for (pPager = pDb->pgrList; pPager; pPager = pDb->pgrList) {

View File

@ -292,7 +292,23 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
*/
return 0;
}
/*
int tdbPagerCancelDirty(SPager *pPager, SPage *pPage, TXN *pTxn) {
SRBTreeNode *pNode = tRBTreeGet(&pPager->rbt, (SRBTreeNode *)pPage);
if (pNode) {
pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
if (pTxn->jPageSet) {
hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
}
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
return 0;
}
*/
int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
SPage *pPage;
int ret;
@ -338,10 +354,13 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
if (pTxn->jPageSet) {
hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
}
tdbTrace("tdb/pager-commit: remove page: %p %d from dirty tree: %p", pPage, TDB_PAGE_PGNO(pPage), &pPager->rbt);
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
tdbTrace("pager/commit reset dirty tree: %p", &pPager->rbt);
tdbTrace("tdb/pager-commit reset dirty tree: %p", &pPager->rbt);
tRBTreeCreate(&pPager->rbt, pageCmpFn);
// sync the db file
@ -629,6 +648,8 @@ int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
return 0;
}
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn);
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
TXN *pTxn) {
SPage *pPage;
@ -643,7 +664,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
// alloc new page
if (pgno == 0) {
loadPage = 0;
ret = tdbPagerAllocPage(pPager, &pgno);
ret = tdbPagerAllocPage(pPager, &pgno, pTxn);
if (ret < 0) {
tdbError("tdb/pager: %p, ret: %d pgno: %" PRIu32 ", alloc page failed.", pPager, ret, pgno);
return -1;
@ -695,23 +716,86 @@ void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn) {
// TDB_PAGE_PGNO(pPage), pPage);
}
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno) {
// TODO: Allocate a page from the free list
int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn) {
int code = 0;
SPgno pgno = TDB_PAGE_PGNO(pPage);
// memset(pPage->pData, 0, pPage->pageSize);
tdbTrace("tdb/insert-free-page: tbc recycle page: %d.", pgno);
// printf("tdb/insert-free-page: tbc recycle page: %d.\n", pgno);
code = tdbTbInsert(pPager->pEnv->pFreeDb, &pgno, sizeof(pgno), NULL, 0, pTxn);
if (code < 0) {
tdbError("tdb/insert-free-page: tb insert failed with ret: %d.", code);
return -1;
}
pPage->pPager = NULL;
return code;
}
static int tdbPagerRemoveFreePage(SPager *pPager, SPgno *pPgno, TXN *pTxn) {
int code = 0;
TBC *pCur;
if (!pPager->pEnv->pFreeDb) {
return 0;
}
code = tdbTbcOpen(pPager->pEnv->pFreeDb, &pCur, pTxn);
if (code < 0) {
return 0;
}
code = tdbTbcMoveToFirst(pCur);
if (code) {
tdbError("tdb/remove-free-page: moveto first failed with ret: %d.", code);
tdbTbcClose(pCur);
return 0;
}
void *pKey = NULL;
int nKey = 0;
code = tdbTbcGet(pCur, (const void **)&pKey, &nKey, NULL, NULL);
if (code < 0) {
// tdbError("tdb/remove-free-page: tbc get failed with ret: %d.", code);
tdbTbcClose(pCur);
return 0;
}
*pPgno = *(SPgno *)pKey;
tdbTrace("tdb/remove-free-page: tbc get page: %d.", *pPgno);
// printf("tdb/remove-free-page: tbc get page: %d.\n", *pPgno);
code = tdbTbcDelete(pCur);
if (code < 0) {
tdbError("tdb/remove-free-page: tbc delete failed with ret: %d.", code);
tdbTbcClose(pCur);
return 0;
}
tdbTbcClose(pCur);
return 0;
}
static int tdbPagerAllocFreePage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
// Allocate a page from the free list
return tdbPagerRemoveFreePage(pPager, ppgno, pTxn);
}
static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
*ppgno = ++pPager->dbFileSize;
// tdbError("tdb/alloc-new-page: %d.", *ppgno);
return 0;
}
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno) {
static int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno, TXN *pTxn) {
int ret;
*ppgno = 0;
// Try to allocate from the free list of the pager
ret = tdbPagerAllocFreePage(pPager, ppgno);
ret = tdbPagerAllocFreePage(pPager, ppgno, pTxn);
if (ret < 0) {
return -1;
}

View File

@ -138,6 +138,7 @@ typedef struct {
SPgno pgno;
u8 *pBuf;
u8 freeKV;
SArray *ofps;
} SCellDecoder;
struct SBTC {
@ -198,7 +199,8 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn);
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
TXN *pTxn);
void tdbPagerReturnPage(SPager *pPager, SPage *pPage, TXN *pTxn);
int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
int tdbPagerInsertFreePage(SPager *pPager, SPage *pPage, TXN *pTxn);
// int tdbPagerAllocPage(SPager *pPager, SPgno *ppgno);
int tdbPagerRestoreJournals(SPager *pPager);
int tdbPagerRollback(SPager *pPager);
@ -373,6 +375,7 @@ static inline SCell *tdbPageGetCell(SPage *pPage, int idx) {
#ifdef USE_MAINDB
#define TDB_MAINDB_NAME "main.tdb"
#define TDB_FREEDB_NAME "_free.db"
#endif
struct STDB {
@ -386,6 +389,7 @@ struct STDB {
SPager **pgrHash;
#ifdef USE_MAINDB
TTB *pMainDb;
TTB *pFreeDb;
#endif
int64_t txnId;
};
@ -403,6 +407,7 @@ struct SPager {
SRBTree rbt;
// u8 inTran;
TXN *pActiveTxn;
SArray *ofps;
SPager *pNext; // used by TDB
SPager *pHashNext; // used by TDB
#ifdef USE_MAINDB

View File

@ -14,3 +14,7 @@ target_link_libraries(tdbExOVFLTest tdb gtest gtest_main)
add_executable(tdbPageDefragmentTest "tdbPageDefragmentTest.cpp")
target_link_libraries(tdbPageDefragmentTest tdb gtest gtest_main)
# page recycling testing
add_executable(tdbPageRecycleTest "tdbPageRecycleTest.cpp")
target_link_libraries(tdbPageRecycleTest tdb gtest gtest_main)

View File

@ -190,6 +190,15 @@ static void insertOfp(void) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
// TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) {
@ -233,6 +242,13 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
tdbFree(pVal);
}
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
// TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) {
@ -334,6 +350,15 @@ tdbBegin(pEnv, &txn);
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
// TEST(tdb_test, DISABLED_simple_insert1) {
@ -407,6 +432,8 @@ TEST(tdb_test, simple_insert1) {
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
{ // Query the data
void *pVal = NULL;
int vLen;

View File

@ -0,0 +1,835 @@
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "tdb.h"
#include <shared_mutex>
#include <string>
#include <thread>
#include <vector>
#include "tlog.h"
typedef struct SPoolMem {
int64_t size;
struct SPoolMem *prev;
struct SPoolMem *next;
} SPoolMem;
static SPoolMem *openPool() {
SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool));
pPool->prev = pPool->next = pPool;
pPool->size = 0;
return pPool;
}
static void clearPool(SPoolMem *pPool) {
SPoolMem *pMem;
do {
pMem = pPool->next;
if (pMem == pPool) break;
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
} while (1);
assert(pPool->size == 0);
}
static void closePool(SPoolMem *pPool) {
clearPool(pPool);
taosMemoryFree(pPool);
}
static void *poolMalloc(void *arg, size_t size) {
void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size);
if (pMem == NULL) {
assert(0);
}
pMem->size = sizeof(*pMem) + size;
pMem->next = pPool->next;
pMem->prev = pPool;
pPool->next->prev = pMem;
pPool->next = pMem;
pPool->size += pMem->size;
ptr = (void *)(&pMem[1]);
return ptr;
}
static void poolFree(void *arg, void *ptr) {
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = &(((SPoolMem *)ptr)[-1]);
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
}
static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
int k1, k2;
std::string s1((char *)pKey1 + 3, kLen1 - 3);
std::string s2((char *)pKey2 + 3, kLen2 - 3);
k1 = stoi(s1);
k2 = stoi(s2);
if (k1 < k2) {
return -1;
} else if (k1 > k2) {
return 1;
} else {
return 0;
}
}
static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
int mlen;
int cret;
ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);
mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
cret = memcmp(pKey1, pKey2, mlen);
if (cret == 0) {
if (keyLen1 < keyLen2) {
cret = -1;
} else if (keyLen1 > keyLen2) {
cret = 1;
} else {
cret = 0;
}
}
return cret;
}
static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) {
TDB *pEnv = NULL;
int ret = tdbOpen(envName, pageSize, pageNum, &pEnv, 0);
if (ret) {
pEnv = NULL;
}
return pEnv;
}
static void generateBigVal(char *val, int valLen) {
for (int i = 0; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
}
static void insertOfp(void) {
int ret = 0;
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
// ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN *txn = NULL;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
// generate value payload
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[32605];
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
// insert the generated big data
// char const *key = "key1";
char const *key = "key123456789";
ret = tdbTbInsert(pDb, key, strlen(key) + 1, val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
static void clearDb(char const *db) { taosRemoveDir(db); }
TEST(TdbPageRecycleTest, DISABLED_TbInsertTest) {
// TEST(TdbPageRecycleTest, TbInsertTest) {
// ofp inserting
clearDb("tdb");
insertOfp();
}
TEST(TdbPageRecycleTest, DISABLED_TbGetTest) {
// TEST(TdbPageRecycleTest, TbGetTest) {
clearDb("tdb");
insertOfp();
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
// int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// generate value payload
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[32605];
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
{ // Query the data
void *pVal = NULL;
int vLen;
// char const *key = "key1";
char const *key = "key123456789";
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, valLen);
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
tdbFree(pVal);
}
}
TEST(TdbPageRecycleTest, DISABLED_TbDeleteTest) {
// TEST(TdbPageRecycleTest, TbDeleteTest) {
int ret = 0;
taosRemoveDir("tdb");
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN *txn;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
// generate value payload
// char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[((4083 - 4 - 3 - 2) + 1) * 2]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
{ // insert the generated big data
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, valLen);
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
tdbFree(pVal);
}
/* open to debug committed file
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
*/
{ // upsert the data
ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the upserted data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, strlen("value1"));
GTEST_ASSERT_EQ(memcmp("value1", pVal, vLen), 0);
tdbFree(pVal);
}
{ // delete the data
ret = tdbTbDelete(pDb, "key1", strlen("key1"), txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the deleted data
void *pVal = NULL;
int vLen = -1;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == -1);
GTEST_ASSERT_EQ(ret, -1);
GTEST_ASSERT_EQ(vLen, -1);
GTEST_ASSERT_EQ(pVal, nullptr);
tdbFree(pVal);
}
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
}
TEST(TdbPageRecycleTest, DISABLED_simple_insert1) {
// TEST(TdbPageRecycleTest, simple_insert1) {
int ret;
TDB *pEnv;
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1;
TXN *txn;
int const pageSize = 4096;
taosRemoveDir("tdb");
// Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
{
char key[64];
// char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key0");
sprintf(val, "value%d", iData);
// ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
// GTEST_ASSERT_EQ(ret, 0);
// generate value payload
int valLen = sizeof(val) / sizeof(val[0]);
for (int i = 6; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
// commit the transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
{ // Query the data
void *pVal = NULL;
int vLen;
for (int i = 1; i <= nData; i++) {
sprintf(key, "key%d", i);
// sprintf(val, "value%d", i);
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, sizeof(val) / sizeof(val[0]));
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
}
tdbFree(pVal);
}
{ // Iterate to query the DB data
TBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int count = 0;
ret = tdbTbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0);
tdbTbcMoveToFirst(pDBC);
for (;;) {
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
// std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
// std::cout << std::endl;
count++;
}
GTEST_ASSERT_EQ(count, nData);
tdbTbcClose(pDBC);
tdbFree(pKey);
tdbFree(pVal);
}
}
ret = tdbTbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
static void insertDb(int nData) {
int ret = 0;
TDB *pEnv = NULL;
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc;
TXN *txn = NULL;
int const pageSize = 4 * 1024;
// Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// 1, insert nData kv
{
char key[64];
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 0; iData < nData; ++iData) {
sprintf(key, "key%03d", iData);
sprintf(val, "value%03d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
// commit the transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// 2, delete nData/2 records
closePool(pPool);
}
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
system("ls -l ./tdb");
}
static void deleteDb(int nData) {
int ret = 0;
TDB *pEnv = NULL;
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc;
TXN *txn = NULL;
int const pageSize = 4 * 1024;
// Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv, 0);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// 2, delete nData/2 records
{
char key[64];
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
for (int iData = 0; iData < nData; iData++) {
// if (iData % 2 == 0) continue;
sprintf(key, "key%03d", iData);
sprintf(val, "value%03d", iData);
{ // delete the data
ret = tdbTbDelete(pDb, key, strlen(key), txn);
GTEST_ASSERT_EQ(ret, 0);
}
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
// commit the transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
}
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
system("ls -l ./tdb");
}
static const int nDataConst = 256 * 19;
// TEST(TdbPageRecycleTest, DISABLED_seq_insert) {
TEST(TdbPageRecycleTest, seq_insert) {
clearDb("tdb");
insertDb(nDataConst);
}
// TEST(TdbPageRecycleTest, DISABLED_seq_delete) {
TEST(TdbPageRecycleTest, seq_delete) { deleteDb(nDataConst); }
// TEST(TdbPageRecycleTest, DISABLED_recycly_insert) {
TEST(TdbPageRecycleTest, recycly_insert) { insertDb(nDataConst); }
// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp) {
TEST(TdbPageRecycleTest, recycly_seq_insert_ofp) {
clearDb("tdb");
insertOfp();
system("ls -l ./tdb");
}
static void deleteOfp(void) {
// open Env
int ret = 0;
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN *txn;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
{ // delete the data
char const *key = "key123456789";
ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn);
GTEST_ASSERT_EQ(ret, 0);
}
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
ret = tdbTbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}
// TEST(TdbPageRecycleTest, DISABLED_seq_delete_ofp) {
TEST(TdbPageRecycleTest, seq_delete_ofp) {
deleteOfp();
system("ls -l ./tdb");
}
// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp_again) {
TEST(TdbPageRecycleTest, recycly_seq_insert_ofp_again) {
insertOfp();
system("ls -l ./tdb");
}
// TEST(TdbPageRecycleTest, DISABLED_recycly_seq_insert_ofp_nocommit) {
TEST(TdbPageRecycleTest, recycly_seq_insert_ofp_nocommit) {
clearDb("tdb");
insertOfp();
system("ls -l ./tdb");
// open Env
int ret = 0;
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN *txn;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
{ // delete the data
char const *key = "key123456789";
ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn);
GTEST_ASSERT_EQ(ret, 0);
}
// 1, insert nData kv
{
int nData = nDataConst;
char key[64];
char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
for (int iData = 0; iData < nData; ++iData) {
sprintf(key, "key%03d", iData);
sprintf(val, "value%03d", iData);
ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
// start a new transaction
clearPool(pPool);
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
}
}
}
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
system("ls -l ./tdb");
}
// TEST(TdbPageRecycleTest, DISABLED_recycly_delete_interior_ofp_nocommit) {
TEST(TdbPageRecycleTest, recycly_delete_interior_ofp_nocommit) {
clearDb("tdb");
// open Env
int ret = 0;
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = NULL; // tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb, 0);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN *txn;
tdbBegin(pEnv, &txn, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
char key[1024] = {0};
int count = sizeof(key) / sizeof(key[0]);
for (int i = 0; i < count - 1; ++i) {
key[i] = 'a';
}
// insert n ofp keys to form 2-layer btree
{
for (int i = 0; i < 7; ++i) {
// sprintf(&key[count - 2], "%c", i);
key[count - 2] = '0' + i;
ret = tdbTbInsert(pDb, key, count, NULL, NULL, txn);
GTEST_ASSERT_EQ(ret, 0);
}
}
/*
// delete one interior key
{
sprintf(&key[count - 2], "%c", 2);
key[count - 2] = '0' + 2;
ret = tdbTbDelete(pDb, key, strlen(key) + 1, txn);
GTEST_ASSERT_EQ(ret, 0);
}
*/
// commit current transaction
tdbCommit(pEnv, txn);
tdbPostCommit(pEnv, txn);
closePool(pPool);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
system("ls -l ./tdb");
}

View File

@ -129,6 +129,7 @@ sql DROP INDEX sma_index_3 ;
print ========== step8
sql drop database if exists db;
sleep 2000
sql create database db duration 300;
sql use db;
sql create table stb1(ts timestamp, c_int int, c_bint bigint, c_sint smallint, c_tint tinyint,c_float float, c_double double, c_bool bool,c_binary binary(16), c_nchar nchar(32), c_ts timestamp,c_tint_un tinyint unsigned, c_sint_un smallint unsigned,c_int_un int unsigned, c_bint_un bigint unsigned) tags (t_int int);