Merge pull request #17240 from taosdata/fix/TD-19261

fix(tdb): subtract payload size with cell header size
This commit is contained in:
Shengliang Guan 2022-10-09 19:30:57 +08:00 committed by GitHub
commit e99b8a5f57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 62 deletions

View File

@ -1003,14 +1003,14 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
int nLeft = nPayload; int nLeft = nPayload;
int bytes; int bytes;
int lastPage = 0; int lastPage = 0;
if (nLocal >= kLen + 4) { if (nLocal >= nHeader + kLen + sizeof(SPgno)) {
// pack key to local // pack key to local
memcpy(pCell + nHeader, pKey, kLen); memcpy(pCell + nHeader, pKey, kLen);
nLeft -= kLen; nLeft -= kLen;
// pack partial val to local if any space left // pack partial val to local if any space left
if (nLocal > kLen + 4) { if (nLocal > nHeader + kLen + sizeof(SPgno)) {
memcpy(pCell + nHeader + kLen, pVal, nLocal - kLen - sizeof(SPgno)); memcpy(pCell + nHeader + kLen, pVal, nLocal - nHeader - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno); nLeft -= nLocal - nHeader - kLen - sizeof(SPgno);
} }
// pack nextPgno // pack nextPgno
@ -1150,9 +1150,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
// free local buffer // free local buffer
tdbFree(pBuf); tdbFree(pBuf);
*szPayload = nLocal; *szPayload = nLocal - nHeader;
// ASSERT(0);
} }
return 0; return 0;
@ -1246,10 +1244,10 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
int bytes; int bytes;
int lastPage = 0; int lastPage = 0;
if (nLocal >= pDecoder->kLen + 4) { if (nLocal >= pDecoder->kLen + nHeader + sizeof(SPgno)) {
pDecoder->pKey = (SCell *)pCell + nHeader; pDecoder->pKey = (SCell *)pCell + nHeader;
nLeft -= kLen; nLeft -= kLen;
if (nLocal > kLen + 4) { if (nLocal > kLen + nHeader + sizeof(SPgno)) {
// read partial val to local // read partial val to local
pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen); pDecoder->pVal = tdbRealloc(pDecoder->pVal, vLen);
if (pDecoder->pVal == NULL) { if (pDecoder->pVal == NULL) {
@ -1259,9 +1257,9 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
tdbDebug("tdb btc decoder: %p/0x%x pVal: %p ", pDecoder, pDecoder->freeKV, pDecoder->pVal); tdbDebug("tdb btc decoder: %p/0x%x pVal: %p ", pDecoder, pDecoder->freeKV, pDecoder->pVal);
memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - kLen - sizeof(SPgno)); memcpy(pDecoder->pVal, pCell + nHeader + kLen, nLocal - nHeader - kLen - sizeof(SPgno));
nLeft -= nLocal - kLen - sizeof(SPgno); nLeft -= nLocal - nHeader - kLen - sizeof(SPgno);
} }
memcpy(&pgno, pCell + nHeader + nPayload - nLeft, sizeof(pgno)); memcpy(&pgno, pCell + nHeader + nPayload - nLeft, sizeof(pgno));
@ -1474,7 +1472,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
int nPayload = kLen + vLen; int nPayload = kLen + vLen;
if (nHeader + nPayload <= pPage->maxLocal) { if (nHeader + nPayload <= pPage->maxLocal) {
return nHeader + kLen + vLen; return nHeader + nPayload;
} else { } else {
int maxLocal = pPage->maxLocal; int maxLocal = pPage->maxLocal;
@ -1486,7 +1484,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
// free ofp pages' cells // free ofp pages' cells
if (dropOfp) { if (dropOfp) {
int ret = 0; int ret = 0;
SPgno pgno = *(SPgno *)(pCell + nHeader + nLocal - sizeof(SPgno)); SPgno pgno = *(SPgno *)(pCell + nLocal - sizeof(SPgno));
int nLeft = nPayload - nLocal + sizeof(SPgno); int nLeft = nPayload - nLocal + sizeof(SPgno);
SPage *ofp; SPage *ofp;
int bytes; int bytes;
@ -1513,7 +1511,7 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell, int dropOfp, TXN *
} }
} }
return nHeader + nLocal; return nLocal;
} }
} }
// TDB_BTREE_CELL // TDB_BTREE_CELL

View File

@ -119,12 +119,12 @@ static int tDefaultKeyCmpr(const void *pKey1, int keyLen1, const void *pKey2, in
return cret; return cret;
} }
TEST(TdbOVFLPagesTest, TbUpsertTest) { TEST(TdbOVFLPagesTest, DISABLED_TbUpsertTest) {
// TEST(TdbOVFLPagesTest, TbUpsertTest) {
} }
TEST(TdbOVFLPagesTest, TbPGetTest) { TEST(TdbOVFLPagesTest, DISABLED_TbPGetTest) {
// TEST(TdbOVFLPagesTest, TbPGetTest) {
} }
static void generateBigVal(char *val, int valLen) { static void generateBigVal(char *val, int valLen) {
@ -162,7 +162,8 @@ static void insertOfp(void) {
// open db // open db
TTB *pDb = NULL; TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr; tdb_cmpr_fn_t compFunc = tKeyCmpr;
ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); // ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// open the pool // open the pool
@ -176,12 +177,15 @@ static void insertOfp(void) {
tdbBegin(pEnv, &txn); tdbBegin(pEnv, &txn);
// generate value payload // generate value payload
char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) // char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[32605];
int valLen = sizeof(val) / sizeof(val[0]); int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen); generateBigVal(val, valLen);
// insert the generated big data // insert the generated big data
ret = tdbTbInsert(pDb, "key1", strlen("key1"), val, valLen, &txn); // char const *key = "key1";
char const *key = "key123456789";
ret = tdbTbInsert(pDb, key, strlen(key), val, valLen, &txn);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// commit current transaction // commit current transaction
@ -189,12 +193,12 @@ static void insertOfp(void) {
tdbTxnClose(&txn); tdbTxnClose(&txn);
} }
//TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) { TEST(TdbOVFLPagesTest, DISABLED_TbInsertTest) {
TEST(TdbOVFLPagesTest, TbInsertTest) { // TEST(TdbOVFLPagesTest, TbInsertTest) {
insertOfp(); insertOfp();
} }
//TEST(TdbOVFLPagesTest, DISABLED_TbGetTest) { // TEST(TdbOVFLPagesTest, DISABLED_TbGetTest) {
TEST(TdbOVFLPagesTest, TbGetTest) { TEST(TdbOVFLPagesTest, TbGetTest) {
insertOfp(); insertOfp();
@ -207,11 +211,13 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
// open db // open db
TTB *pDb = NULL; TTB *pDb = NULL;
tdb_cmpr_fn_t compFunc = tKeyCmpr; tdb_cmpr_fn_t compFunc = tKeyCmpr;
int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb); // int ret = tdbTbOpen("ofp_insert.db", -1, -1, compFunc, pEnv, &pDb);
int ret = tdbTbOpen("ofp_insert.db", 12, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// generate value payload // generate value payload
char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) // char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[32605];
int valLen = sizeof(val) / sizeof(val[0]); int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen); generateBigVal(val, valLen);
@ -219,7 +225,9 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
void *pVal = NULL; void *pVal = NULL;
int vLen; int vLen;
ret = tdbTbGet(pDb, "key1", strlen("key1"), &pVal, &vLen); // char const *key = "key1";
char const *key = "key123456789";
ret = tdbTbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0); ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
@ -230,7 +238,8 @@ TEST(TdbOVFLPagesTest, TbGetTest) {
} }
} }
TEST(TdbOVFLPagesTest, TbDeleteTest) { TEST(TdbOVFLPagesTest, DISABLED_TbDeleteTest) {
// TEST(TdbOVFLPagesTest, TbDeleteTest) {
int ret = 0; int ret = 0;
taosRemoveDir("tdb"); taosRemoveDir("tdb");
@ -258,7 +267,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) {
tdbBegin(pEnv, &txn); tdbBegin(pEnv, &txn);
// generate value payload // generate value payload
char val[((4083 - 4 - 3 - 2)+1)*100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) char val[((4083 - 4 - 3 - 2) + 1) * 100]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int valLen = sizeof(val) / sizeof(val[0]); int valLen = sizeof(val) / sizeof(val[0]);
generateBigVal(val, valLen); generateBigVal(val, valLen);
@ -281,12 +290,12 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) {
tdbFree(pVal); tdbFree(pVal);
} }
/* open to debug committed file /* open to debug committed file
tdbCommit(pEnv, &txn); tdbCommit(pEnv, &txn);
tdbTxnClose(&txn); tdbTxnClose(&txn);
++txnid; ++txnid;
tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); tdbTxnOpen(&txn, txnid, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
tdbBegin(pEnv, &txn); tdbBegin(pEnv, &txn);
*/ */
{ // upsert the data { // upsert the data
ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), &txn); ret = tdbTbUpsert(pDb, "key1", strlen("key1"), "value1", strlen("value1"), &txn);
@ -332,7 +341,7 @@ TEST(TdbOVFLPagesTest, TbDeleteTest) {
} }
TEST(tdb_test, DISABLED_simple_insert1) { TEST(tdb_test, DISABLED_simple_insert1) {
//TEST(tdb_test, simple_insert1) { // TEST(tdb_test, simple_insert1) {
int ret; int ret;
TDB *pEnv; TDB *pEnv;
TTB *pDb; TTB *pDb;
@ -354,8 +363,8 @@ TEST(tdb_test, DISABLED_simple_insert1) {
{ {
char key[64]; char key[64];
//char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) // char val[(4083 - 4 - 3 - 2)]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
char val[(4083 - 4 - 3 - 2)+1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4) char val[(4083 - 4 - 3 - 2) + 1]; // pSize(4096) - amSize(1) - pageHdr(8) - footerSize(4)
int64_t poolLimit = 4096; // 1M pool limit int64_t poolLimit = 4096; // 1M pool limit
int64_t txnid = 0; int64_t txnid = 0;
SPoolMem *pPool; SPoolMem *pPool;
@ -372,8 +381,8 @@ TEST(tdb_test, DISABLED_simple_insert1) {
sprintf(key, "key0"); sprintf(key, "key0");
sprintf(val, "value%d", iData); sprintf(val, "value%d", iData);
//ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn); // ret = tdbTbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
//GTEST_ASSERT_EQ(ret, 0); // GTEST_ASSERT_EQ(ret, 0);
// generate value payload // generate value payload
int valLen = sizeof(val) / sizeof(val[0]); int valLen = sizeof(val) / sizeof(val[0]);