diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index e8e7260514..bfd79f7965 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -95,6 +95,7 @@ static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNo static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t version, TSKEY ts, int32_t flags); static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc); static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc); +static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow); // SMemTable int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { @@ -201,15 +202,18 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p } // do insert data to SMemData - STsdbRow tRow = {.version = version}; - SEncoder ec = {0}; - SDecoder dc = {0}; + SMemSkipListNode *forwards[SL_MAX_LEVEL]; + SMemSkipListNode *pNode; + STsdbRow tRow = {.version = version}; + SEncoder ec = {0}; + SDecoder dc = {0}; tDecoderInit(&dc, pSubmitBlk->pData, pSubmitBlk->nData); tsdbMemSkipListCursorInit(pMemTb->pSlc, &pMemData->sl); for (;;) { if (tDecodeIsEnd(&dc)) break; + // decode row if (tDecodeBinary(&dc, (const uint8_t **)&tRow.pRow, &tRow.szRow) < 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -219,22 +223,12 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p tsdbMemSkipListCursorMoveTo(pMemTb->pSlc, version, tRow.pRow->ts, 0); // encode row - int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); - int32_t tsize; - int32_t ret; - tEncodeSize(tsdbEncodeRow, &tRow, tsize, ret); - SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level)); + pNode = tsdbMemSkipListNodeCreate(pPool, &pMemData->sl, &tRow); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pNode->level = level; - tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize); - ret = tsdbEncodeRow(&ec, &tRow); - ASSERT(ret == 0); - tEncoderClear(&ec); - // put the node tsdbMemSkipListCursorPut(pMemTb->pSlc, pNode); @@ -258,14 +252,11 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { int8_t level = 1; - int8_t tlevel; + int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); const uint32_t factor = 4; - if (pSl->size) { - tlevel = TMIN(pSl->maxLevel, pSl->level + 1); - while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { - level++; - } + while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { + level++; } return level; @@ -325,14 +316,12 @@ static int32_t tsdbMemSkipListCursorMoveTo(SMemSkipListCurosr *pSlc, int64_t ver SMemSkipListNode *pHead = SL_HEAD_NODE(pSl); SMemSkipListNode *pTail = SL_TAIL_NODE(pSl); - for (int8_t iLevel = 0; iLevel < maxLevel; iLevel++) { - pForwards[iLevel] = pHead; - } - - for (int8_t iLevel = maxLevel - 1; iLevel >= 0; iLevel--) { - if (iLevel < pSl->level) { + if (pSl->size == 0) { + for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) { + pForwards[iLevel] = pHead; } } + return 0; } @@ -344,4 +333,23 @@ static int32_t tsdbMemSkipListCursorMoveToNext(SMemSkipListCurosr *pSlc) { static int32_t tsdbMemSkipListCursorMoveToPrev(SMemSkipListCurosr *pSlc) { // TODO return 0; +} + +static SMemSkipListNode *tsdbMemSkipListNodeCreate(SVBufPool *pPool, SMemSkipList *pSl, const STsdbRow *pTRow) { + int32_t tsize; + int32_t ret; + int8_t level = tsdbMemSkipListRandLevel(pSl); + SMemSkipListNode *pNode = NULL; + SEncoder ec = {0}; + + tEncodeSize(tsdbEncodeRow, pTRow, tsize, ret); + pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level)); + if (pNode) { + pNode->level = level; + tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize); + tsdbEncodeRow(&ec, pTRow); + tEncoderClear(&ec); + } + + return pNode; } \ No newline at end of file