This commit is contained in:
Hongze Cheng 2022-05-07 12:21:00 +00:00
parent a06bf6e5ca
commit 3cad99a178
1 changed files with 35 additions and 11 deletions

View File

@ -76,6 +76,8 @@ struct SMemSkipListCurosr {
#define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l) #define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l)
#define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l) #define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l)
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl);
// SMemTable // SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
SMemTable *pMemTb = NULL; SMemTable *pMemTb = NULL;
@ -176,20 +178,19 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
// do insert data to SMemData // do insert data to SMemData
SMemSkipListCurosr slc = {0}; SMemSkipListCurosr slc = {0};
const uint8_t *p = pSubmitBlk->pData;
const uint8_t *pt;
const STSRow *pRow; const STSRow *pRow;
uint64_t szRow; uint32_t szRow;
SDecoder decoder = {0}; SDecoder decoder = {0};
// tCoderInit(&coder, TD_LITTLE_ENDIAN, pSubmitBlk->pData, pSubmitBlk->nData, TD_DECODER); tDecoderInit(&decoder, pSubmitBlk->pData, pSubmitBlk->nData);
for (;;) { for (;;) {
// if (tDecodeIsEnd(&coder)) break; if (tDecodeIsEnd(&decoder)) break;
if (tDecodeBinary(&decoder, (const uint8_t **)&pRow, &szRow) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// if (tDecodeBinary(&coder, (const uint8_t **)&pRow, &szRow) < 0) {
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
// check the row (todo) // check the row (todo)
// // move the cursor to position to write (todo) // // move the cursor to position to write (todo)
@ -197,7 +198,9 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
// tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c); // tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c);
// ASSERT(c); // ASSERT(c);
// // encode row // encode row
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
// int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); // int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
// int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt); // int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (p - pt);
// pSlNode = vnodeBufPoolMalloc(pPool, tsize); // pSlNode = vnodeBufPoolMalloc(pPool, tsize);
@ -215,7 +218,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts; if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts;
if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts; if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts;
} }
// tCoderClear(&coder); tDecoderClear(&decoder);
// tsdbMemSkipListCursorClose(&slc); // tsdbMemSkipListCursorClose(&slc);
// update status // update status
@ -228,4 +231,25 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version; if (pMemTb->maxVer == -1 || pMemTb->maxVer < version) pMemTb->maxVer = version;
return 0; return 0;
}
static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
int8_t level = 1;
const uint32_t factor = 4;
if (pSl->size) {
while ((taosRandR(&pSl->seed) % factor) == 0 && level < pSl->maxLevel) {
level++;
}
if (level > pSl->level) {
if (pSl->level < pSl->maxLevel) {
level = pSl->level + 1;
} else {
level = pSl->level;
}
}
}
return level;
} }