refact TSDB

This commit is contained in:
Hongze Cheng 2022-05-09 06:29:43 +00:00
parent 9a035fdb1d
commit fba58c763f
1 changed files with 34 additions and 26 deletions

View File

@ -95,6 +95,7 @@ static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNo
static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags);
static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc);
static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc);
static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow);
// SMemTable
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) {
@ -201,15 +202,18 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
}
// do insert data to SMemData
STsdbRow tRow = {.version = version};
SEncoder ec = {0};
SDecoder dc = {0};
SMemSkipListNode *forwards[SL_MAX_LEVEL];
SMemSkipListNode *pNode;
STsdbRow tRow = {.version = version};
SEncoder ec = {0};
SDecoder dc = {0};
tDecoderInit(&dc, pSubmitBlk->pData, pSubmitBlk->nData);
tsdbMemSkipListCursorInit(pMemTb->pSlc, &pMemData->sl);
for (;;) {
if (tDecodeIsEnd(&dc)) break;
// decode row
if (tDecodeBinary(&dc, (const uint8_t **)&tRow.pRow, &tRow.szRow) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
@ -219,22 +223,12 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
tsdbMemSkipListCursorMoveTo(pMemTb->pSlc, version, tRow.pRow->ts, 0);
// encode row
int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl);
int32_t tsize;
int32_t ret;
tEncodeSize(tsdbEncodeRow, &tRow, tsize, ret);
SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level));
pNode = tsdbMemSkipListNodeCreate(pPool, &pMemData->sl, &tRow);
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pNode->level = level;
tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize);
ret = tsdbEncodeRow(&ec, &tRow);
ASSERT(ret == 0);
tEncoderClear(&ec);
// put the node
tsdbMemSkipListCursorPut(pMemTb->pSlc, pNode);
@ -258,14 +252,11 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p
static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
int8_t level = 1;
int8_t tlevel;
int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
const uint32_t factor = 4;
if (pSl->size) {
tlevel = TMIN(pSl->maxLevel, pSl->level + 1);
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
level++;
}
while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) {
level++;
}
return level;
@ -325,14 +316,12 @@ static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t ver
SMemSkipListNode *pHead = SL_HEAD_NODE(pSl);
SMemSkipListNode *pTail = SL_TAIL_NODE(pSl);
for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) {
pForwards[iLevel] = pHead;
}
for (int8_t iLevel = maxLevel - 1; iLevel >= 0; iLevel--) {
if (iLevel < pSl->level) {
if (pSl->size == 0) {
for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) {
pForwards[iLevel] = pHead;
}
}
return 0;
}
@ -344,4 +333,23 @@ static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc) {
static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc) {
// TODO
return 0;
}
static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow) {
int32_t tsize;
int32_t ret;
int8_t level = tsdbMemSkipListRandLevel(pSl);
SMemSkipListNode *pNode = NULL;
SEncoder ec = {0};
tEncodeSize(tsdbEncodeRow, pTRow, tsize, ret);
pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level));
if (pNode) {
pNode->level = level;
tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize);
tsdbEncodeRow(&ec, pTRow);
tEncoderClear(&ec);
}
return pNode;
}