Merge pull request #14020 from taosdata/fix/tdb-maxLocal

tdb/ofp: support overflow pages for big data
This commit is contained in:
Minglei Jin 2022-06-20 18:49:24 +08:00 committed by GitHub
commit a805e1c06b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 973 additions and 75 deletions

View File

@ -17,6 +17,7 @@
#define TDB_BTREE_ROOT 0x1
#define TDB_BTREE_LEAF 0x2
#define TDB_BTREE_OVFL 0x4
struct SBTree {
SPgno root;
@ -38,9 +39,11 @@ struct SBTree {
#define TDB_BTREE_PAGE_SET_FLAGS(PAGE, flags) ((PAGE)->pData[0] = (flags))
#define TDB_BTREE_PAGE_IS_ROOT(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_ROOT)
#define TDB_BTREE_PAGE_IS_LEAF(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_LEAF)
#define TDB_BTREE_PAGE_IS_OVFL(PAGE) (TDB_BTREE_PAGE_GET_FLAGS(PAGE) & TDB_BTREE_OVFL)
#define TDB_BTREE_ASSERT_FLAG(flags) \
ASSERT(TDB_FLAG_IS(flags, TDB_BTREE_ROOT) || TDB_FLAG_IS(flags, TDB_BTREE_LEAF) || \
TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0))
TDB_FLAG_IS(flags, TDB_BTREE_ROOT | TDB_BTREE_LEAF) || TDB_FLAG_IS(flags, 0) || \
TDB_FLAG_IS(flags, TDB_BTREE_OVFL))
#pragma pack(push, 1)
typedef struct {
@ -62,10 +65,10 @@ static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2
static int tdbBtreeOpenImpl(SBTree *pBt);
static int tdbBtreeInitPage(SPage *pPage, void *arg, int init);
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
int *szCell);
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder);
int *szCell, TXN *pTxn, SBTree *pBt);
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt);
static int tdbBtreeBalance(SBTC *pBtc);
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell);
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *pTxn, SBTree *pBt);
static int tdbBtcMoveDownward(SBTC *pBtc);
static int tdbBtcMoveUpward(SBTC *pBtc);
@ -255,7 +258,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
}
pCell = tdbPageGetCell(btc.pPage, btc.idx);
tdbBtreeDecodeCell(btc.pPage, pCell, &cd);
tdbBtreeDecodeCell(btc.pPage, pCell, &cd, btc.pTxn, pBt);
if (ppKey) {
pTKey = tdbRealloc(*ppKey, cd.kLen);
@ -281,6 +284,14 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
memcpy(*ppVal, cd.pVal, cd.vLen);
}
if (TDB_CELLDECODER_FREE_KEY(&cd)) {
tdbFree(cd.pKey);
}
if (TDB_CELLDECODER_FREE_VAL(&cd)) {
tdbFree(cd.pVal);
}
tdbBtcClose(&btc);
return 0;
@ -375,6 +386,11 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg, int init) {
pPage->vLen = pBt->valLen;
pPage->maxLocal = pBt->maxLeaf;
pPage->minLocal = pBt->minLeaf;
} else if (TDB_BTREE_PAGE_IS_OVFL(pPage)) {
pPage->kLen = pBt->keyLen;
pPage->vLen = pBt->valLen;
pPage->maxLocal = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr));
pPage->minLocal = pBt->minLocal;
} else {
pPage->kLen = pBt->keyLen;
pPage->vLen = sizeof(SPgno);
@ -499,7 +515,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
for (int i = 0; i < nOlds; i++) {
if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) {
pCell = tdbPageGetCell(pParent, sIdx + i);
szDivCell[i] = tdbBtreeCellSize(pParent, pCell);
szDivCell[i] = tdbBtreeCellSize(pParent, pCell, 0, NULL, NULL);
pDivCell[i] = tdbOsMalloc(szDivCell[i]);
memcpy(pDivCell[i], pCell, szDivCell[i]);
}
@ -524,7 +540,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
for (int i = 0; i < nOlds; i++) {
nCells = TDB_PAGE_TOTAL_CELLS(pParent);
if (sIdx < nCells) {
tdbPageDropCell(pParent, sIdx);
tdbPageDropCell(pParent, sIdx, pTxn, pBt);
} else {
((SIntHdr *)pParent->pData)->pgno = 0;
}
@ -582,7 +598,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
for (;;) {
pCell = tdbPageGetCell(pOlds[infoNews[iNew - 1].iPage], infoNews[iNew - 1].oIdx);
szLCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell);
szLCell = tdbBtreeCellSize(pOlds[infoNews[iNew - 1].iPage], pCell, 0, NULL, NULL);
if (!childNotLeaf) {
szRCell = szLCell;
} else {
@ -600,7 +616,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
}
pCell = tdbPageGetCell(pPage, oIdx);
szRCell = tdbBtreeCellSize(pPage, pCell);
szRCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL);
}
ASSERT(infoNews[iNew - 1].cnt > 0);
@ -687,7 +703,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
for (int oIdx = 0; oIdx < TDB_PAGE_TOTAL_CELLS(pPage); oIdx++) {
pCell = tdbPageGetCell(pPage, oIdx);
szCell = tdbBtreeCellSize(pPage, pCell);
szCell = tdbBtreeCellSize(pPage, pCell, 0, NULL, NULL);
ASSERT(nNewCells <= infoNews[iNew].cnt);
ASSERT(iNew < nNews);
@ -703,14 +719,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
if (iNew == nNews - 1 && pIntHdr->pgno == 0) {
pIntHdr->pgno = TDB_PAGE_PGNO(pNews[iNew]);
} else {
tdbBtreeDecodeCell(pPage, pCell, &cd);
tdbBtreeDecodeCell(pPage, pCell, &cd, pTxn, pBt);
// TODO: pCell here may be inserted as an overflow cell, handle it
SCell *pNewCell = tdbOsMalloc(cd.kLen + 9);
int szNewCell;
SPgno pgno;
pgno = TDB_PAGE_PGNO(pNews[iNew]);
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell);
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell, pTxn, pBt);
tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
tdbOsFree(pNewCell);
}
@ -846,13 +862,50 @@ static int tdbBtreeBalance(SBTC *pBtc) {
}
// TDB_BTREE_BALANCE
static int tdbFetchOvflPage(SPager *pPager, SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) {
int ret = 0;
*pPgno = 0;
SBtreeInitPageArg iArg;
iArg.pBt = pBt;
iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
ret = tdbPagerFetchPage(pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn);
if (ret < 0) {
return -1;
}
// mark dirty
ret = tdbPagerWrite(pPager, *ppOfp);
if (ret < 0) {
ASSERT(0);
return -1;
}
return ret;
}
static int tdbLoadOvflPage(SPager *pPager, SPgno *pPgno, SPage **ppOfp, TXN *pTxn, SBTree *pBt) {
int ret = 0;
SBtreeInitPageArg iArg;
iArg.pBt = pBt;
iArg.flags = TDB_FLAG_ADD(0, TDB_BTREE_OVFL);
ret = tdbPagerFetchPage(pPager, pPgno, ppOfp, tdbBtreeInitPage, &iArg, pTxn);
if (ret < 0) {
return -1;
}
return ret;
}
// TDB_BTREE_CELL =====================
static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const void *pKey, int kLen, const void *pVal,
int vLen, int *szPayload) {
int nPayload;
int vLen, int *szPayload, TXN *pTxn, SBTree *pBt) {
int ret = 0;
int nPayload = kLen + vLen;
int maxLocal = pPage->maxLocal;
nPayload = kLen + vLen;
if (nPayload + nHeader <= pPage->maxLocal) {
if (nPayload + nHeader <= maxLocal) {
// no overflow page is needed
memcpy(pCell + nHeader, pKey, kLen);
if (pVal) {
@ -861,18 +914,190 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
*szPayload = nPayload;
return 0;
}
} else {
// handle overflow case
// calc local storage size
int minLocal = pPage->minLocal;
int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
int nLocal = surplus <= maxLocal ? surplus : minLocal;
{
// TODO: handle overflow case
ASSERT(0);
//int ofpCap = tdbPageCapacity(pBt->pageSize, sizeof(SIntHdr));
// fetch a new ofp and make it dirty
SPgno pgno = 0;
SPage *ofp, *nextOfp;
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
// local buffer for cell
SCell *pBuf = tdbRealloc(NULL, pBt->pageSize);
if (pBuf == NULL) {
return -1;
}
int nLeft = nPayload;
int bytes;
int lastPage = 0;
if (nLocal >= kLen + 4) {
// pack key to local
memcpy(pCell + nHeader, pKey, kLen);
nLeft -= kLen;
// pack partial val to local if any space left
if (nLocal > kLen + 4) {
memcpy(pCell + nHeader + kLen, pVal, nLocal - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno);
}
// pack nextPgno
memcpy(pCell + nHeader + nPayload - nLeft, &pgno, sizeof(pgno));
// pack left val data to ovpages
do {
lastPage = 0;
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
// fetch next ofp if not last page
if (!lastPage) {
// fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
} else {
pgno = 0;
}
memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
memcpy(pBuf + bytes, &pgno, sizeof(pgno));
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
ofp = nextOfp;
nLeft -= bytes;
} while (nLeft > 0);
} else {
int nLeftKey = kLen;
// pack partial key and nextPgno
memcpy(pCell + nHeader, pKey, nLocal - 4);
nLeft -= nLocal - 4;
nLeftKey -= nLocal -4;
memcpy(pCell + nHeader + nLocal - 4, &pgno, sizeof(pgno));
int lastKeyPageSpace = 0;
// pack left key & val to ovpages
do {
// cal key to cpy
int lastKeyPage = 0;
if (nLeftKey <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeftKey;
lastKeyPage = 1;
lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
// cpy key
memcpy(pBuf, ((SCell *)pKey) + kLen - nLeftKey, bytes);
if (lastKeyPage) {
if (lastKeyPageSpace >= vLen) {
memcpy(pBuf + kLen -nLeftKey, pVal, vLen);
nLeft -= vLen;
pgno = 0;
} else {
memcpy(pBuf + kLen -nLeftKey, pVal, lastKeyPageSpace);
nLeft -= lastKeyPageSpace;
// fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
return -1;
}
}
} else {
// fetch next ofp, a new ofp and make it dirty
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
return -1;
}
}
memcpy(pBuf + kLen - nLeft, &pgno, sizeof(pgno));
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
return -1;
}
ofp = nextOfp;
nLeftKey -= bytes;
nLeft -= bytes;
} while (nLeftKey > 0);
while (nLeft > 0) {
// pack left val data to ovpages
lastPage = 0;
if (nLeft <= maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = maxLocal - sizeof(SPgno);
}
// fetch next ofp if not last page
if (!lastPage) {
// fetch a new ofp and make it dirty
ret = tdbFetchOvflPage(pPage->pPager, &pgno, &nextOfp, pTxn, pBt);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
} else {
pgno = 0;
}
memcpy(pBuf, ((SCell *)pVal) + vLen - nLeft, bytes);
memcpy(pBuf + bytes, &pgno, sizeof(pgno));
ret = tdbPageInsertCell(ofp, 0, pBuf, bytes + sizeof(pgno), 0);
if (ret < 0) {
tdbFree(pBuf);
return -1;
}
ofp = nextOfp;
nLeft -= bytes;
}
}
// free local buffer
tdbFree(pBuf);
*szPayload = nLocal;
// ASSERT(0);
}
return 0;
}
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
int *szCell) {
int *szCell, TXN *pTxn, SBTree *pBt) {
u8 leaf;
int nHeader;
int nPayload;
@ -911,7 +1136,7 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
vLen = 0;
}
ret = tdbBtreeEncodePayload(pPage, pCell, nHeader, pKey, kLen, pVal, vLen, &nPayload);
ret = tdbBtreeEncodePayload(pPage, pCell, nHeader, pKey, kLen, pVal, vLen, &nPayload, pTxn, pBt);
if (ret < 0) {
// TODO
ASSERT(0);
@ -922,8 +1147,13 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
return 0;
}
static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder) {
static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt) {
int ret = 0;
int nPayload;
int maxLocal = pPage->maxLocal;
int kLen = pDecoder->kLen;
int vLen = pDecoder->vLen;
if (pDecoder->pVal) {
ASSERT(!TDB_BTREE_PAGE_IS_LEAF(pPage));
@ -932,24 +1162,171 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
nPayload = pDecoder->kLen + pDecoder->vLen;
}
if (nHeader + nPayload <= pPage->maxLocal) {
if (nHeader + nPayload <= maxLocal) {
// no over flow case
pDecoder->pKey = pCell + nHeader;
pDecoder->pKey = (SCell *)pCell + nHeader;
if (pDecoder->pVal == NULL && pDecoder->vLen > 0) {
pDecoder->pVal = pCell + nHeader + pDecoder->kLen;
pDecoder->pVal = (SCell *)pCell + nHeader + pDecoder->kLen;
}
return 0;
}
} else {
// handle overflow case
// calc local storage size
int minLocal = pPage->minLocal;
int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
int nLocal = surplus <= maxLocal ? surplus : minLocal;
{
// TODO: handle overflow case
ASSERT(0);
int nLeft = nPayload;
SPgno pgno = 0;
SPage *ofp;
SCell *ofpCell;
int bytes;
int lastPage = 0;
if (nLocal >= pDecoder->kLen + 4) {
pDecoder->pKey = (SCell *)pCell + nHeader;
nLeft -= kLen;
if (nLocal > kLen + 4) {
// read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno);
}
memcpy(&pgno, pCell + nHeader + nPayload - nLeft, sizeof(pgno));
// unpack left val data from ovpages
while (pgno != 0) {
ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
ofpCell = tdbPageGetCell(ofp, 0);
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
memcpy(pDecoder->pVal + vLen - nLeft, ofpCell, bytes);
nLeft -= bytes;
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
}
} else {
int nLeftKey = kLen;
// load partial key and nextPgno
pDecoder->pKey = tdbRealloc(pDecoder->pKey, kLen);
if (pDecoder->pKey == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_KEY(pDecoder);
memcpy(pDecoder->pKey, pCell + nHeader, nLocal - 4);
nLeft -= nLocal - 4;
nLeftKey -= nLocal -4;
memcpy(&pgno, pCell + nHeader + nLocal - 4, sizeof(pgno));
int lastKeyPageSpace = 0;
// load left key & val to ovpages
while (pgno != 0) {
ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
ofpCell = tdbPageGetCell(ofp, 0);
int lastKeyPage = 0;
if (nLeftKey <= maxLocal - sizeof(SPgno)) {
bytes = nLeftKey;
lastKeyPage = 1;
lastKeyPageSpace = ofp->maxLocal - sizeof(SPgno) - nLeftKey;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
// cpy key
memcpy(pDecoder->pKey + kLen - nLeftKey, ofpCell, bytes);
if (lastKeyPage) {
if (lastKeyPageSpace >= vLen) {
pDecoder->pVal = ofpCell + kLen -nLeftKey;
nLeft -= vLen;
pgno = 0;
} else {
// read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
memcpy(pDecoder->pVal, ofpCell + kLen -nLeftKey, lastKeyPageSpace);
nLeft -= lastKeyPageSpace;
}
}
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
nLeftKey -= bytes;
nLeft -= bytes;
}
while (nLeft > 0) {
ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
ofpCell = tdbPageGetCell(ofp, 0);
// load left val data to ovpages
lastPage = 0;
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
lastPage = 1;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
if (lastPage) {
pgno = 0;
}
if (!pDecoder->pVal) {
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) {
return -1;
}
TDB_CELLDECODER_SET_FREE_VAL(pDecoder);
}
memcpy(pDecoder->pVal, ofpCell + vLen - nLeft, bytes);
nLeft -= bytes;
memcpy(&pgno, ofpCell + vLen - nLeft + bytes, sizeof(pgno));
nLeft -= bytes;
}
}
}
return 0;
}
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder) {
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder, TXN *pTxn, SBTree *pBt) {
u8 leaf;
int nHeader;
int ret;
@ -963,6 +1340,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
pDecoder->vLen = -1;
pDecoder->pVal = NULL;
pDecoder->pgno = 0;
TDB_CELLDECODER_SET_FREE_NIL(pDecoder);
// 1. Decode header part
if (!leaf) {
@ -987,7 +1365,7 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
}
// 2. Decode payload part
ret = tdbBtreeDecodePayload(pPage, pCell, nHeader, pDecoder);
ret = tdbBtreeDecodePayload(pPage, pCell, nHeader, pDecoder, pTxn, pBt);
if (ret < 0) {
return -1;
}
@ -995,41 +1373,71 @@ static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pD
return 0;
}
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) {
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *pTxn, SBTree *pBt) {
u8 leaf;
int szCell;
int kLen = 0, vLen = 0;
int kLen = 0, vLen = 0, nHeader = 0;
leaf = TDB_BTREE_PAGE_IS_LEAF(pPage);
szCell = 0;
if (!leaf) {
szCell += sizeof(SPgno);
nHeader += sizeof(SPgno);
}
if (pPage->kLen == TDB_VARIANT_LEN) {
szCell += tdbGetVarInt(pCell + szCell, &kLen);
nHeader += tdbGetVarInt(pCell + nHeader, &kLen);
} else {
kLen = pPage->kLen;
}
if (pPage->vLen == TDB_VARIANT_LEN) {
ASSERT(leaf);
szCell += tdbGetVarInt(pCell + szCell, &vLen);
nHeader += tdbGetVarInt(pCell + nHeader, &vLen);
} else if (leaf) {
vLen = pPage->vLen;
}
szCell = szCell + kLen + vLen;
int nPayload = kLen + vLen;
if (nHeader + nPayload <= pPage->maxLocal) {
return nHeader + kLen + vLen;
} else {
int maxLocal = pPage->maxLocal;
if (szCell <= pPage->maxLocal) {
return szCell;
}
// calc local storage size
int minLocal = pPage->minLocal;
int surplus = minLocal + (nPayload + nHeader - minLocal) % (maxLocal - sizeof(SPgno));
int nLocal = surplus <= maxLocal ? surplus : minLocal;
{
// TODO
ASSERT(0);
return 0;
// free ofp pages' cells
if (dropOfp) {
int ret = 0;
SPgno pgno = *(SPgno *) (pCell + nHeader + nLocal - sizeof(SPgno));
int nLeft = nPayload - nLocal + sizeof(SPgno);
SPage *ofp;
int bytes;
while (pgno != 0) {
ret = tdbLoadOvflPage(pPage->pPager, &pgno, &ofp, pTxn, pBt);
if (ret < 0) {
return -1;
}
SCell *ofpCell = tdbPageGetCell(ofp, 0);
if (nLeft <= ofp->maxLocal - sizeof(SPgno)) {
bytes = nLeft;
} else {
bytes = ofp->maxLocal - sizeof(SPgno);
}
memcpy(&pgno, ofpCell + bytes, sizeof(pgno));
tdbPagerReturnPage(pPage->pPager, ofp, pTxn);
nLeft -= bytes;
}
}
return nHeader + nLocal;
}
}
// TDB_BTREE_CELL
@ -1212,7 +1620,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
pKey = tdbRealloc(*ppKey, cd.kLen);
if (pKey == NULL) {
@ -1258,7 +1666,7 @@ int tdbBtreePrev(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
pKey = tdbRealloc(*ppKey, cd.kLen);
if (pKey == NULL) {
@ -1427,7 +1835,7 @@ int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int
}
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &pBtc->coder);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &pBtc->coder, pBtc->pTxn, pBtc->pBt);
if (ppKey) {
*ppKey = (void *)pBtc->coder.pKey;
@ -1464,7 +1872,7 @@ int tdbBtcDelete(SBTC *pBtc) {
return -1;
}
tdbPageDropCell(pBtc->pPage, idx);
tdbPageDropCell(pBtc->pPage, idx, pBtc->pTxn, pBtc->pBt);
// update interior page or do balance
if (idx == nCells - 1) {
@ -1488,9 +1896,9 @@ int tdbBtcDelete(SBTC *pBtc) {
// update the cell with new key
pCell = tdbOsMalloc(nKey + 9);
tdbBtreeEncodeCell(pPage, pKey, nKey, &pgno, sizeof(pgno), pCell, &szCell);
tdbBtreeEncodeCell(pPage, pKey, nKey, &pgno, sizeof(pgno), pCell, &szCell, pBtc->pTxn, pBtc->pBt);
ret = tdbPageUpdateCell(pPage, idx, pCell, szCell);
ret = tdbPageUpdateCell(pPage, idx, pCell, szCell, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
tdbOsFree(pCell);
ASSERT(0);
@ -1529,7 +1937,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
// alloc space
szBuf = kLen + nData + 14;
pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
pBuf = tdbRealloc(pBtc->pBt->pBuf, pBtc->pBt->pageSize > szBuf ? szBuf : pBtc->pBt->pageSize);
if (pBuf == NULL) {
ASSERT(0);
return -1;
@ -1538,7 +1946,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
pCell = (SCell *)pBtc->pBt->pBuf;
// encode cell
ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pData, nData, pCell, &szCell);
ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pData, nData, pCell, &szCell, pBtc->pTxn, pBtc->pBt);
if (ret < 0) {
ASSERT(0);
return -1;
@ -1559,7 +1967,7 @@ int tdbBtcUpsert(SBTC *pBtc, const void *pKey, int kLen, const void *pData, int
} else {
ASSERT(pBtc->idx < nCells);
ret = tdbPageUpdateCell(pBtc->pPage, pBtc->idx, pCell, szCell);
ret = tdbPageUpdateCell(pBtc->pPage, pBtc->idx, pCell, szCell, pBtc->pTxn, pBtc->pBt);
}
if (ret < 0) {
ASSERT(0);
@ -1620,7 +2028,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
// check if key <= current position
if (idx < nCells) {
pCell = tdbPageGetCell(pPage, idx);
tdbBtreeDecodeCell(pPage, pCell, &cd);
tdbBtreeDecodeCell(pPage, pCell, &cd, pBtc->pTxn, pBtc->pBt);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c > 0) break;
}
@ -1629,7 +2037,7 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
if (idx > 0) {
pCell = tdbPageGetCell(pPage, idx - 1);
tdbBtreeDecodeCell(pPage, pCell, &cd);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen, pBtc->pTxn, pBtc->pBt);
if (c <= 0) break;
}
}
@ -1769,4 +2177,4 @@ void tdbBtPageInfo(SPage *pPage, int idx) {
pBtPageInfo->nOvfl = pPage->nOverflow;
}
#endif
// TDB_BTREE_DEBUG
// TDB_BTREE_DEBUG

View File

@ -82,7 +82,8 @@ int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg)
return 0;
}
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) {
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt)) {
pPage->pPageHdr = pPage->pData + szAmHdr;
TDB_PAGE_NCELLS_SET(pPage, 0);
TDB_PAGE_CCELLS_SET(pPage, pPage->pageSize - sizeof(SPageFtr));
@ -98,7 +99,8 @@ void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell
ASSERT((u8 *)pPage->pPageFtr == pPage->pFreeEnd);
}
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *)) {
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt)) {
pPage->pPageHdr = pPage->pData + szAmHdr;
pPage->pCellIdx = pPage->pPageHdr + TDB_PAGE_HDR_SIZE(pPage);
pPage->pFreeStart = pPage->pCellIdx + TDB_PAGE_OFFSET_SIZE(pPage) * TDB_PAGE_NCELLS(pPage);
@ -171,12 +173,12 @@ int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl
return 0;
}
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell) {
tdbPageDropCell(pPage, idx);
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt) {
tdbPageDropCell(pPage, idx, pTxn, pBt);
return tdbPageInsertCell(pPage, idx, pCell, szCell, 0);
}
int tdbPageDropCell(SPage *pPage, int idx) {
int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt) {
int lidx;
SCell *pCell;
int szCell;
@ -205,7 +207,7 @@ int tdbPageDropCell(SPage *pPage, int idx) {
lidx = idx - iOvfl;
pCell = TDB_PAGE_CELL_AT(pPage, lidx);
szCell = (*pPage->xCellSize)(pPage, pCell);
szCell = (*pPage->xCellSize)(pPage, pCell, 1, pTxn, pBt);
tdbPageFree(pPage, lidx, pCell, szCell);
TDB_PAGE_NCELLS_SET(pPage, nCells - 1);
@ -420,7 +422,7 @@ static int tdbPageDefragment(SPage *pPage) {
ASSERT(pCell != NULL);
szCell = (*pPage->xCellSize)(pPage, pCell);
szCell = (*pPage->xCellSize)(pPage, pCell, 0, NULL, NULL);
ASSERT(pCell + szCell <= pNextCell);
if (pCell + szCell < pNextCell) {

View File

@ -116,13 +116,25 @@ typedef struct SBtInfo {
int nData;
} SBtInfo;
#define TDB_CELLD_F_NIL 0x0
#define TDB_CELLD_F_KEY 0x1
#define TDB_CELLD_F_VAL 0x2
#define TDB_CELLDECODER_SET_FREE_NIL(pCellDecoder) ((pCellDecoder)->freeKV = TDB_CELLD_F_NIL)
#define TDB_CELLDECODER_SET_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_KEY)
#define TDB_CELLDECODER_SET_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV |= TDB_CELLD_F_VAL)
#define TDB_CELLDECODER_FREE_KEY(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_KEY)
#define TDB_CELLDECODER_FREE_VAL(pCellDecoder) ((pCellDecoder)->freeKV & TDB_CELLD_F_VAL)
typedef struct {
int kLen;
const u8 *pKey;
u8 *pKey;
int vLen;
const u8 *pVal;
u8 *pVal;
SPgno pgno;
u8 *pBuf;
u8 freeKV;
} SCellDecoder;
struct SBTC {
@ -251,7 +263,7 @@ struct SPage {
int vLen; // value length of the page, -1 for unknown
int maxLocal;
int minLocal;
int (*xCellSize)(const SPage *, SCell *);
int (*xCellSize)(const SPage *, SCell *, int, TXN *pTxn, SBTree *pBt);
// Fields used by SPCache
TDB_PCACHE_PAGE
};
@ -298,16 +310,18 @@ static inline int tdbTryLockPage(tdb_spinlock_t *pLock) {
#define TDB_PAGE_USABLE_SIZE(pPage) ((u8 *)(pPage)->pPageFtr - (pPage)->pCellIdx)
#define TDB_PAGE_FREE_SIZE(pPage) (*(pPage)->pPageMethods->getFreeBytes)(pPage)
#define TDB_PAGE_PGNO(pPage) ((pPage)->pgid.pgno)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell) + (pPage)->pPageMethods->szOffset)
#define TDB_BYTES_CELL_TAKEN(pPage, pCell) ((*(pPage)->xCellSize)(pPage, pCell, 0, NULL, NULL) + (pPage)->pPageMethods->szOffset)
#define TDB_PAGE_OFFSET_SIZE(pPage) ((pPage)->pPageMethods->szOffset)
int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t), void *arg);
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg);
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *));
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *));
void tdbPageZero(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt));
void tdbPageInit(SPage *pPage, u8 szAmHdr, int (*xCellSize)(const SPage *, SCell *, int,
TXN *, SBTree *pBt));
int tdbPageInsertCell(SPage *pPage, int idx, SCell *pCell, int szCell, u8 asOvfl);
int tdbPageDropCell(SPage *pPage, int idx);
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell);
int tdbPageDropCell(SPage *pPage, int idx, TXN *pTxn, SBTree *pBt);
int tdbPageUpdateCell(SPage *pPage, int idx, SCell *pCell, int szCell, TXN *pTxn, SBTree *pBt);
void tdbPageCopy(SPage *pFromPage, SPage *pToPage);
int tdbPageCapacity(int pageSize, int amHdrSize);

View File

@ -4,4 +4,9 @@ target_link_libraries(tdbTest tdb gtest gtest_main)
# tdbUtilTest
add_executable(tdbUtilTest "tdbUtilTest.cpp")
target_link_libraries(tdbUtilTest tdb gtest gtest_main)
target_link_libraries(tdbUtilTest tdb gtest gtest_main)
# tdbUtilTest
add_executable(tdbExOVFLTest "tdbExOVFLTest.cpp")
target_link_libraries(tdbExOVFLTest tdb gtest gtest_main)

View File

@ -0,0 +1,469 @@
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "os.h"
#include "tdb.h"
#include <shared_mutex>
#include <string>
#include <thread>
#include <vector>
typedef struct SPoolMem {
int64_t size;
struct SPoolMem *prev;
struct SPoolMem *next;
} SPoolMem;
static SPoolMem *openPool() {
SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool));
pPool->prev = pPool->next = pPool;
pPool->size = 0;
return pPool;
}
static void clearPool(SPoolMem *pPool) {
SPoolMem *pMem;
do {
pMem = pPool->next;
if (pMem == pPool) break;
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
} while (1);
assert(pPool->size == 0);
}
static void closePool(SPoolMem *pPool) {
clearPool(pPool);
taosMemoryFree(pPool);
}
static void *poolMalloc(void *arg, size_t size) {
void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size);
if (pMem == NULL) {
assert(0);
}
pMem->size = sizeof(*pMem) + size;
pMem->next = pPool->next;
pMem->prev = pPool;
pPool->next->prev = pMem;
pPool->next = pMem;
pPool->size += pMem->size;
ptr = (void *)(&pMem[1]);
return ptr;
}
static void poolFree(void *arg, void *ptr) {
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;
pMem = &(((SPoolMem *)ptr)[-1]);
pMem->next->prev = pMem->prev;
pMem->prev->next = pMem->next;
pPool->size -= pMem->size;
taosMemoryFree(pMem);
}
static int tKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
int k1, k2;
std::string s1((char *)pKey1 + 3, kLen1 - 3);
std::string s2((char *)pKey2 + 3, kLen2 - 3);
k1 = stoi(s1);
k2 = stoi(s2);
if (k1 < k2) {
return -1;
} else if (k1 > k2) {
return 1;
} else {
return 0;
}
}
static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2) {
int mlen;
int cret;
ASSERT(keyLen1 > 0 && keyLen2 > 0 && pKey1 != NULL && pKey2 != NULL);
mlen = keyLen1 < keyLen2 ? keyLen1 : keyLen2;
cret = memcmp(pKey1, pKey2, mlen);
if (cret == 0) {
if (keyLen1 < keyLen2) {
cret = -1;
} else if (keyLen1 > keyLen2) {
cret = 1;
} else {
cret = 0;
}
}
return cret;
}
TEST(TdbOVFLPagesTest, TbUpsertTest) {
}
TEST(TdbOVFLPagesTest, TbPGetTest) {
}
static void generateBigVal(char *val, int valLen) {
for (int i = 0; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
}
static TDB *openEnv(char const *envName, int const pageSize, int const pageNum) {
TDB *pEnv = NULL;
int ret = tdbOpen(envName, pageSize, pageNum, &pEnv);
if (ret) {
pEnv = NULL;
}
return pEnv;
}
static void insertOfp(void) {
int ret = 0;
taosRemoveDir("tdb");
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN txn;
int64_t txnid = 0;
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
// generate value payload
char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
// insert the generated big data
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn);
GTEST_ASSERT_EQ(ret, 0);
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
}
//TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) {
TEST(TdbOVFLPagesTest, TbInsertTest) {
insertOfp();
}
//TEST(TdbOVFLPagesTest, DISABLED_TbGetTest) {
TEST(TdbOVFLPagesTest, TbGetTest) {
insertOfp();
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
// generate value payload
char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
{ // Query the data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, valLen);
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
tdbFree(pVal);
}
}
TEST(TdbOVFLPagesTest, TbDeleteTest) {
int ret = 0;
taosRemoveDir("tdb");
// open Env
int const pageSize = 4096;
int const pageNum = 64;
TDB *pEnv = openEnv("tdb", pageSize, pageNum);
GTEST_ASSERT_NE(pEnv, nullptr);
// open db
TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
// open the pool
SPoolMem *pPool = openPool();
// start a transaction
TXN txn;
int64_t txnid = 0;
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
// generate value payload
char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen);
{ // insert the generated big data
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, valLen);
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
tdbFree(pVal);
}
/* open to debug committed file
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
*/
{ // upsert the data
ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the upserted data
void *pVal = NULL;
int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, strlen("value1"));
GTEST_ASSERT_EQ(memcmp("value1", pVal, vLen), 0);
tdbFree(pVal);
}
{ // delete the data
ret = tdbTbDelete(pDb, "key1", strlen("key1"), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
{ // query the deleted data
void *pVal = NULL;
int vLen = -1;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen);
ASSERT(ret == -1);
GTEST_ASSERT_EQ(ret, -1);
GTEST_ASSERT_EQ(vLen, -1);
GTEST_ASSERT_EQ(pVal, nullptr);
tdbFree(pVal);
}
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
}
TEST(tdb_test, DISABLED_simple_insert1) {
//TEST(tdb_test, simple_insert1) {
int ret;
TDB *pEnv;
TTB *pDb;
tdb_cmpr_fn_t compFunc;
int nData = 1;
TXN txn;
int const pageSize = 4096;
taosRemoveDir("tdb");
// Open Env
ret = tdbOpen("tdb", pageSize, 64, &pEnv);
GTEST_ASSERT_EQ(ret, 0);
// Create a database
compFunc = tKeyCmpr;
ret = tdbTbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
{
char key[64];
//char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[(4083 - 4 - 3 - 2)+1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit
int64_t txnid = 0;
SPoolMem *pPool;
// open the pool
pPool = openPool();
// start a transaction
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key0");
sprintf(val, "value%d", iData);
//ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
//GTEST_ASSERT_EQ(ret, 0);
// generate value payload
int valLen = sizeof(val) / sizeof(val[0]);
for (int i = 6; i < valLen; ++i) {
char c = char(i & 0xff);
if (c == 0) {
c = 1;
}
val[i] = c;
}
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
if (pPool->size >= poolLimit) {
// commit current transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
// start a new transaction
clearPool(pPool);
txnid++;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn);
}
}
// commit the transaction
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
{ // Query the data
void *pVal = NULL;
int vLen;
for (int i = 1; i <= nData; i++) {
sprintf(key, "key%d", i);
sprintf(val, "value%d", i);
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(vLen, strlen(val));
GTEST_ASSERT_EQ(memcmp(val, pVal, vLen), 0);
}
tdbFree(pVal);
}
{ // Iterate to query the DB data
TBC *pDBC;
void *pKey = NULL;
void *pVal = NULL;
int vLen, kLen;
int count = 0;
ret = tdbTbcOpen(pDb, &pDBC, NULL);
GTEST_ASSERT_EQ(ret, 0);
tdbTbcMoveToFirst(pDBC);
for (;;) {
ret = tdbTbcNext(pDBC, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
// std::cout.write((char *)pKey, kLen) /* << " " << kLen */ << " ";
// std::cout.write((char *)pVal, vLen) /* << " " << vLen */;
// std::cout << std::endl;
count++;
}
GTEST_ASSERT_EQ(count, nData);
tdbTbcClose(pDBC);
tdbFree(pKey);
tdbFree(pVal);
}
}
ret = tdbTbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbTbClose(pDb);
// Close Env
ret = tdbClose(pEnv);
GTEST_ASSERT_EQ(ret, 0);
}