fix child table query problem

This commit is contained in:
Hongze Cheng 2022-04-29 10:02:36 +00:00
parent 48207a166e
commit fee7499cfb
9 changed files with 157 additions and 87 deletions

View File

@ -98,6 +98,8 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL); tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
tdbDbcMoveToFirst(pTbCur->pDbc);
return pTbCur; return pTbCur;
} }
@ -185,7 +187,9 @@ struct SMCtbCursor {
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
SMCtbCursor *pCtbCur = NULL; SMCtbCursor *pCtbCur = NULL;
SCtbIdxKey ctbIdxKey;
int ret; int ret;
int c;
pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur)); pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
if (pCtbCur == NULL) { if (pCtbCur == NULL) {
@ -199,6 +203,14 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
return NULL; return NULL;
} }
// move to the suid
ctbIdxKey.suid = uid;
ctbIdxKey.uid = INT64_MIN;
tdbDbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
if (c > 0) {
tdbDbcMoveToNext(pCtbCur->pCur);
}
return pCtbCur; return pCtbCur;
} }
@ -225,6 +237,9 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
} }
pCtbIdxKey = pCtbCur->pKey; pCtbIdxKey = pCtbCur->pKey;
if (pCtbIdxKey->suid > pCtbCur->suid) {
return 0;
}
return pCtbIdxKey->uid; return pCtbIdxKey->uid;
} }

View File

@ -289,7 +289,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
pVal = pBuf = buf; pVal = pBuf = buf;
metaEncodeTbInfo(&pBuf, pTbCfg); metaEncodeTbInfo(&pBuf, pTbCfg);
vLen = POINTER_DISTANCE(pBuf, buf); vLen = POINTER_DISTANCE(pBuf, buf);
ret = tdbDbInsert(pMetaDb->pTbDB, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbPut(pMetaDb->pTbDB, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -311,7 +311,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
pVal = pBuf = buf; pVal = pBuf = buf;
metaEncodeSchemaEx(&pBuf, &schemaWrapper); metaEncodeSchemaEx(&pBuf, &schemaWrapper);
vLen = POINTER_DISTANCE(pBuf, buf); vLen = POINTER_DISTANCE(pBuf, buf);
ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen, &pMeta->pDB->txn); ret = tdbDbPut(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen, &pMeta->pDB->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -325,7 +325,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen = nameLen + 1 + sizeof(uid); kLen = nameLen + 1 + sizeof(uid);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbInsert(pMetaDb->pNameIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbPut(pMetaDb->pNameIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -336,7 +336,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen = sizeof(uid); kLen = sizeof(uid);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbInsert(pMetaDb->pStbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbPut(pMetaDb->pStbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -347,7 +347,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen = sizeof(ctbIdxKey); kLen = sizeof(ctbIdxKey);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbInsert(pMetaDb->pCtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbPut(pMetaDb->pCtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -362,7 +362,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) {
kLen = sizeof(uid); kLen = sizeof(uid);
pVal = NULL; pVal = NULL;
vLen = 0; vLen = 0;
ret = tdbDbInsert(pMetaDb->pNtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn); ret = tdbDbPut(pMetaDb->pNtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
@ -530,7 +530,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
int32_t kLen = sizeof(pSmaCfg->indexUid); int32_t kLen = sizeof(pSmaCfg->indexUid);
int32_t vLen = POINTER_DISTANCE(qBuf, pBuf); int32_t vLen = POINTER_DISTANCE(qBuf, pBuf);
ret = tdbDbInsert(pMeta->pDB->pSmaDB, key, kLen, val, vLen, &pMetaDb->txn); ret = tdbDbPut(pMeta->pDB->pSmaDB, key, kLen, val, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
taosMemoryFreeClear(pBuf); taosMemoryFreeClear(pBuf);
return -1; return -1;
@ -545,7 +545,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
val = NULL; val = NULL;
vLen = 0; vLen = 0;
ret = tdbDbInsert(pMeta->pDB->pSmaIdx, key, kLen, val, vLen, &pMetaDb->txn); ret = tdbDbPut(pMeta->pDB->pSmaIdx, key, kLen, val, vLen, &pMetaDb->txn);
if (ret < 0) { if (ret < 0) {
taosMemoryFreeClear(pBuf); taosMemoryFreeClear(pBuf);
return -1; return -1;

View File

@ -218,7 +218,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tCoderClear(&coder); tCoderClear(&coder);
// write to table.db // write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { if (tdbDbPut(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
goto _err; goto _err;
} }
@ -231,11 +231,11 @@ _err:
} }
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn); return tdbDbPut(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
} }
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn); return tdbDbPut(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
} }
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
@ -258,12 +258,12 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60; ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60;
ttlKey.uid = pME->uid; ttlKey.uid = pME->uid;
return tdbDbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn); return tdbDbPut(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
} }
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid}; SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
return tdbDbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn); return tdbDbPut(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
} }
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pME) {
@ -304,7 +304,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER); tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_ENCODER);
tEncodeSSchemaWrapper(&coder, pSW); tEncodeSSchemaWrapper(&coder, pSW);
if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) { if (tdbDbPut(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
rcode = -1; rcode = -1;
goto _exit; goto _exit;
} }

View File

@ -97,7 +97,7 @@ int32_t tsdbCloseDBF(SDBFile *pDBF) {
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) { int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
int32_t ret; int32_t ret;
ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); ret = tdbDbPut(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) { if (ret < 0) {
tsdbError("Failed to create insert sma data into db, ret = %d", ret); tsdbError("Failed to create insert sma data into db, ret = %d", ret);
return -1; return -1;

View File

@ -40,7 +40,7 @@ int tdbCommit(TENV *pEnv, TXN *pTxn);
int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb); int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb);
int tdbDbClose(TDB *pDb); int tdbDbClose(TDB *pDb);
int tdbDbDrop(TDB *pDb); int tdbDbDrop(TDB *pDb);
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn); int tdbDbPut(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen); int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
@ -52,6 +52,7 @@ int tdbDbcMoveToFirst(TDBC *pDbc);
int tdbDbcMoveToLast(TDBC *pDbc); int tdbDbcMoveToLast(TDBC *pDbc);
int tdbDbcMoveToNext(TDBC *pDbc); int tdbDbcMoveToNext(TDBC *pDbc);
int tdbDbcMoveToPrev(TDBC *pDbc); int tdbDbcMoveToPrev(TDBC *pDbc);
int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, int *pvLen);
int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen); int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen);
int tdbDbcUpdate(TDBC *pDbc, const void *pKey, int kLen, const void *pVal, int vLen); int tdbDbcUpdate(TDBC *pDbc, const void *pKey, int kLen, const void *pVal, int vLen);

View File

@ -58,15 +58,6 @@ typedef struct {
SBTree *pBt; SBTree *pBt;
} SBtreeInitPageArg; } SBtreeInitPageArg;
typedef struct {
int kLen;
const u8 *pKey;
int vLen;
const u8 *pVal;
SPgno pgno;
u8 *pBuf;
} SCellDecoder;
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2); static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2);
static int tdbBtreeOpenImpl(SBTree *pBt); static int tdbBtreeOpenImpl(SBTree *pBt);
static int tdbBtreeInitPage(SPage *pPage, void *arg, int init); static int tdbBtreeInitPage(SPage *pPage, void *arg, int init);
@ -75,7 +66,6 @@ static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const vo
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder); static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder);
static int tdbBtreeBalance(SBTC *pBtc); static int tdbBtreeBalance(SBTC *pBtc);
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell); static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell);
static int tdbBtcMoveToNext(SBTC *pBtc);
static int tdbBtcMoveDownward(SBTC *pBtc); static int tdbBtcMoveDownward(SBTC *pBtc);
static int tdbBtcMoveUpward(SBTC *pBtc); static int tdbBtcMoveUpward(SBTC *pBtc);
@ -1017,6 +1007,7 @@ int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn) {
pBtc->iPage = -1; pBtc->iPage = -1;
pBtc->pPage = NULL; pBtc->pPage = NULL;
pBtc->idx = -1; pBtc->idx = -1;
memset(&pBtc->coder, 0, sizeof(SCellDecoder));
if (pTxn == NULL) { if (pTxn == NULL) {
pBtc->pTxn = &pBtc->txn; pBtc->pTxn = &pBtc->txn;
@ -1059,6 +1050,8 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
return 0; return 0;
} }
} else { } else {
ASSERT(0);
#if 0
// move from a position // move from a position
int iPage = 0; int iPage = 0;
@ -1076,6 +1069,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
tdbBtcMoveUpward(pBtc); tdbBtcMoveUpward(pBtc);
} }
#endif
} }
// move downward // move downward
@ -1124,6 +1118,8 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
return 0; return 0;
} }
} else { } else {
ASSERT(0);
#if 0
int iPage = 0; int iPage = 0;
// downward search // downward search
@ -1146,6 +1142,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
tdbBtcMoveUpward(pBtc); tdbBtcMoveUpward(pBtc);
} }
#endif
} }
// move downward // move downward
@ -1215,7 +1212,7 @@ int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
return 0; return 0;
} }
static int tdbBtcMoveToNext(SBTC *pBtc) { int tdbBtcMoveToNext(SBTC *pBtc) {
int nCells; int nCells;
int ret; int ret;
SCell *pCell; SCell *pCell;
@ -1261,6 +1258,43 @@ static int tdbBtcMoveToNext(SBTC *pBtc) {
return 0; return 0;
} }
int tdbBtcMoveToPrev(SBTC *pBtc) {
if (pBtc->idx < 0) return -1;
pBtc->idx--;
if (pBtc->idx >= 0) {
return 0;
}
// move upward
for (;;) {
if (pBtc->iPage == 0) {
pBtc->idx = -1;
return 0;
}
tdbBtcMoveUpward(pBtc);
pBtc->idx--;
if (pBtc->idx >= 0) {
break;
}
}
// move downward
for (;;) {
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) break;
tdbBtcMoveDownward(pBtc);
if (TDB_BTREE_PAGE_IS_LEAF(pBtc->pPage)) {
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage) - 1;
} else {
pBtc->idx = TDB_PAGE_TOTAL_CELLS(pBtc->pPage);
}
}
return 0;
}
static int tdbBtcMoveDownward(SBTC *pBtc) { static int tdbBtcMoveDownward(SBTC *pBtc) {
int ret; int ret;
SPgno pgno; SPgno pgno;
@ -1306,14 +1340,38 @@ static int tdbBtcMoveUpward(SBTC *pBtc) {
return 0; return 0;
} }
int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int *vLen) {
SCell *pCell;
if (pBtc->idx < 0 || pBtc->idx >= TDB_PAGE_TOTAL_CELLS(pBtc->pPage)) {
return -1;
}
pCell = tdbPageGetCell(pBtc->pPage, pBtc->idx);
tdbBtreeDecodeCell(pBtc->pPage, pCell, &pBtc->coder);
if (ppKey) {
*ppKey = (void *)pBtc->coder.pKey;
*kLen = pBtc->coder.kLen;
}
if (ppVal) {
*ppVal = (void *)pBtc->coder.pVal;
*kLen = pBtc->coder.vLen;
}
return 0;
}
int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) { int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
int ret; int ret;
int nCells; int nCells;
int c; int c;
SCell *pCell; SCell *pCell;
SCellDecoder cd = {0}; SBTree *pBt = pBtc->pBt;
SBTree *pBt = pBtc->pBt; SPager *pPager = pBt->pPager;
SPager *pPager = pBt->pPager; const void *pTKey;
int tkLen;
if (pBtc->iPage < 0) { if (pBtc->iPage < 0) {
// move from a clear cursor // move from a clear cursor
@ -1330,6 +1388,8 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
// for empty tree, just return with an invalid position // for empty tree, just return with an invalid position
if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) return 0; if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) return 0;
} else { } else {
ASSERT(0);
#if 0
SPage *pPage; SPage *pPage;
int idx; int idx;
int iPage = 0; int iPage = 0;
@ -1364,11 +1424,12 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
if (pBtc->iPage == iPage) break; if (pBtc->iPage == iPage) break;
tdbBtcMoveUpward(pBtc); tdbBtcMoveUpward(pBtc);
} }
#endif
} }
// search downward to the leaf // search downward to the leaf
for (;;) { for (;;) {
int lidx, ridx, midx; int lidx, ridx;
SPage *pPage; SPage *pPage;
pPage = pBtc->pPage; pPage = pBtc->pPage;
@ -1377,13 +1438,11 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
ridx = nCells - 1; ridx = nCells - 1;
ASSERT(nCells > 0); ASSERT(nCells > 0);
ASSERT(pBtc->idx == -1);
// compare first cell // compare first cell
midx = lidx; pBtc->idx = lidx;
pCell = tdbPageGetCell(pPage, midx); tdbBtcGet(pBtc, &pTKey, &tkLen, NULL, NULL);
tdbBtreeDecodeCell(pPage, pCell, &cd); c = pBt->kcmpr(pKey, kLen, pTKey, tkLen);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c <= 0) { if (c <= 0) {
ridx = lidx - 1; ridx = lidx - 1;
} else { } else {
@ -1392,10 +1451,9 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
// compare last cell // compare last cell
if (lidx <= ridx) { if (lidx <= ridx) {
midx = ridx; pBtc->idx = ridx;
pCell = tdbPageGetCell(pPage, midx); tdbBtcGet(pBtc, &pTKey, &tkLen, NULL, NULL);
tdbBtreeDecodeCell(pPage, pCell, &cd); c = pBt->kcmpr(pKey, kLen, pTKey, tkLen);
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c >= 0) { if (c >= 0) {
lidx = ridx + 1; lidx = ridx + 1;
} else { } else {
@ -1407,24 +1465,15 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
for (;;) { for (;;) {
if (lidx > ridx) break; if (lidx > ridx) break;
midx = (lidx + ridx) >> 1; pBtc->idx = (lidx + ridx) >> 1;
tdbBtcGet(pBtc, &pTKey, &tkLen, NULL, NULL);
pCell = tdbPageGetCell(pPage, midx); c = pBt->kcmpr(pKey, kLen, pTKey, tkLen);
ret = tdbBtreeDecodeCell(pPage, pCell, &cd);
if (ret < 0) {
// TODO: handle error
ASSERT(0);
return -1;
}
// Compare the key values
c = pBt->kcmpr(pKey, kLen, cd.pKey, cd.kLen);
if (c < 0) { if (c < 0) {
// pKey < cd.pKey // pKey < cd.pKey
ridx = midx - 1; ridx = pBtc->idx - 1;
} else if (c > 0) { } else if (c > 0) {
// pKey > cd.pKey // pKey > cd.pKey
lidx = midx + 1; lidx = pBtc->idx + 1;
} else { } else {
// pKey == cd.pKey // pKey == cd.pKey
break; break;
@ -1433,14 +1482,11 @@ int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
// keep search downward or break // keep search downward or break
if (TDB_BTREE_PAGE_IS_LEAF(pPage)) { if (TDB_BTREE_PAGE_IS_LEAF(pPage)) {
pBtc->idx = midx;
*pCRst = c; *pCRst = c;
break; break;
} else { } else {
if (c <= 0) { if (c > 0) {
pBtc->idx = midx; pBtc->idx += 1;
} else {
pBtc->idx = midx + 1;
} }
tdbBtcMoveDownward(pBtc); tdbBtcMoveDownward(pBtc);
} }

View File

@ -75,7 +75,7 @@ int tdbDbDrop(TDB *pDb) {
return 0; return 0;
} }
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) { int tdbDbPut(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn); return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn);
} }
@ -99,14 +99,6 @@ int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn) {
tdbBtcOpen(&pDbc->btc, pDb->pBt, pTxn); tdbBtcOpen(&pDbc->btc, pDb->pBt, pTxn);
// TODO: move to first now, we can move to any key-value
// and in any direction, design new APIs.
ret = tdbBtcMoveToFirst(&pDbc->btc);
if (ret < 0) {
ASSERT(0);
return -1;
}
*ppDbc = pDbc; *ppDbc = pDbc;
return 0; return 0;
} }
@ -117,10 +109,12 @@ int tdbDbcMoveToFirst(TDBC *pDbc) { return tdbBtcMoveToFirst(&pDbc->btc); }
int tdbDbcMoveToLast(TDBC *pDbc) { return tdbBtcMoveToLast(&pDbc->btc); } int tdbDbcMoveToLast(TDBC *pDbc) { return tdbBtcMoveToLast(&pDbc->btc); }
int tdbDbcMoveToNext(TDBC *pDbc) { return 0; } int tdbDbcMoveToNext(TDBC *pDbc) { return tdbBtcMoveToNext(&pDbc->btc); }
int tdbDbcMoveToPrev(TDBC *pDbc) {
// TODO int tdbDbcMoveToPrev(TDBC *pDbc) { return tdbBtcMoveToPrev(&pDbc->btc); }
return 0;
int tdbDbcGet(TDBC *pDbc, const void **ppKey, int *pkLen, const void **ppVal, int *pvLen) {
return tdbBtcGet(&pDbc->btc, ppKey, pkLen, ppVal, pvLen);
} }
int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen) { int tdbDbcPut(TDBC *pDbc, const void *pKey, int keyLen, const void *pVal, int valLen) {
@ -147,6 +141,7 @@ int tdbDbcNext(TDBC *pDbc, void **ppKey, int *kLen, void **ppVal, int *vLen) {
int tdbDbcClose(TDBC *pDbc) { int tdbDbcClose(TDBC *pDbc) {
if (pDbc) { if (pDbc) {
tdbBtcClose(&pDbc->btc);
tdbOsFree(pDbc); tdbOsFree(pDbc);
} }

View File

@ -103,15 +103,25 @@ typedef struct SBtInfo {
int nData; int nData;
} SBtInfo; } SBtInfo;
typedef struct {
int kLen;
const u8 *pKey;
int vLen;
const u8 *pVal;
SPgno pgno;
u8 *pBuf;
} SCellDecoder;
struct SBTC { struct SBTC {
SBTree *pBt; SBTree *pBt;
i8 iPage; i8 iPage;
SPage *pPage; SPage *pPage;
int idx; int idx;
int idxStack[BTREE_MAX_DEPTH + 1]; int idxStack[BTREE_MAX_DEPTH + 1];
SPage *pgStack[BTREE_MAX_DEPTH + 1]; SPage *pgStack[BTREE_MAX_DEPTH + 1];
TXN *pTxn; SCellDecoder coder;
TXN txn; TXN *pTxn;
TXN txn;
}; };
// SBTree // SBTree
@ -123,11 +133,14 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
// SBTC // SBTC
int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn); int tdbBtcOpen(SBTC *pBtc, SBTree *pBt, TXN *pTxn);
int tdbBtcClose(SBTC *pBtc);
int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst); int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst);
int tdbBtcMoveToFirst(SBTC *pBtc); int tdbBtcMoveToFirst(SBTC *pBtc);
int tdbBtcMoveToLast(SBTC *pBtc); int tdbBtcMoveToLast(SBTC *pBtc);
int tdbBtcMoveToNext(SBTC *pBtc);
int tdbBtcMoveToPrev(SBTC *pBtc);
int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen); int tdbBtreeNext(SBTC *pBtc, void **ppKey, int *kLen, void **ppVal, int *vLen);
int tdbBtcClose(SBTC *pBtc); int tdbBtcGet(SBTC *pBtc, const void **ppKey, int *kLen, const void **ppVal, int *vLen);
// tdbPager.c ==================================== // tdbPager.c ====================================

View File

@ -152,7 +152,7 @@ TEST(tdb_test, simple_test) {
for (int iData = 1; iData <= nData; iData++) { for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData); sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData); sprintf(val, "value%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn); ret = tdbDbPut(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one // if pool is full, commit the transaction and start a new one
@ -269,7 +269,7 @@ TEST(tdb_test, simple_test2) {
for (int iData = 1; iData <= nData; iData++) { for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData); sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData); sprintf(val, "value%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn); ret = tdbDbPut(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(ret, 0);
} }