diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c index 952ccfda9c..e162d519ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable2.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable2.c @@ -22,15 +22,16 @@ typedef struct SMemSkipListNode SMemSkipListNode; typedef struct SMemSkipListCurosr SMemSkipListCurosr; struct SMemTable { - STsdb *pTsdb; - TSKEY minKey; - TSKEY maxKey; - int64_t minVer; - int64_t maxVer; - int64_t nRows; - int32_t nHash; - int32_t nBucket; - SMemData **pBuckets; + STsdb *pTsdb; + TSKEY minKey; + TSKEY maxKey; + int64_t minVer; + int64_t maxVer; + int64_t nRows; + int32_t nHash; + int32_t nBucket; + SMemData **pBuckets; + SMemSkipListCurosr *pSlc; }; struct SMemSkipListNode { @@ -60,9 +61,15 @@ struct SMemData { struct SMemSkipListCurosr { SMemSkipList *pSl; - SMemSkipListNode *pNodeC; + SMemSkipListNode *forwards[]; }; +typedef struct { + int64_t version; + uint32_t szRow; + const STSRow *pRow; +} STsdbRow; + #define HASH_BUCKET(SUID, UID, NBUCKET) (TABS((SUID) + (UID)) % (NBUCKET)) #define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) @@ -76,7 +83,13 @@ struct SMemSkipListCurosr { #define SL_HEAD_NODE_FORWARD(n, l) SL_NODE_FORWARD(n, l) #define SL_TAIL_NODE_BACKWARD(n, l) SL_NODE_FORWARD(n, l) -static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); +static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl); +static int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow); +static int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow); +static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc); +static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc); +static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl); +static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode); // SMemTable int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { @@ -102,6 +115,11 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { taosMemoryFree(pMemTb); return -1; } + if (tsdbMemSkipListCursorCreate(pTsdb->pVnode->config.tsdbCfg.slLevel, &pMemTb->pSlc) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pMemTb->pBuckets); + taosMemoryFree(pMemTb); + } *ppMemTb = pMemTb; return 0; @@ -110,6 +128,7 @@ int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTb) { int32_t tsdbMemTableDestroy2(STsdb *pTsdb, SMemTable *pMemTb) { if (pMemTb) { // loop to destroy the contents (todo) + tsdbMemSkipListCursorDestroy(pMemTb->pSlc); taosMemoryFree(pMemTb->pBuckets); taosMemoryFree(pMemTb); } @@ -177,52 +196,47 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p } // do insert data to SMemData - SMemSkipListCurosr slc = {0}; - const STSRow *pRow; - uint32_t szRow; - SDecoder decoder = {0}; + STsdbRow tRow = {.version = version}; + SEncoder ec = {0}; + SDecoder dc = {0}; - tDecoderInit(&decoder, pSubmitBlk->pData, pSubmitBlk->nData); + tDecoderInit(&dc, pSubmitBlk->pData, pSubmitBlk->nData); + tsdbMemSkipListCursorInit(pMemTb->pSlc, &pMemData->sl); for (;;) { - if (tDecodeIsEnd(&decoder)) break; + if (tDecodeIsEnd(&dc)) break; - if (tDecodeBinary(&decoder, (const uint8_t **)&pRow, &szRow) < 0) { + if (tDecodeBinary(&dc, (const uint8_t **)&tRow.pRow, &tRow.szRow) < 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } - // check the row (todo) - - // // move the cursor to position to write (todo) - // int32_t c; - // tsdbMemSkipListCursorMoveTo(&slc, pTSRow, version, &c); - // ASSERT(c); + // move cursor // encode row - int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); - int32_t tsize = SL_NODE_SIZE(level) + sizeof(version) + (0 /*todo*/); - SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize); + int8_t level = tsdbMemSkipListRandLevel(&pMemData->sl); + int32_t tsize; + int32_t ret; + tEncodeSize(tsdbEncodeRow, &tRow, tsize, ret); + SMemSkipListNode *pNode = vnodeBufPoolMalloc(pPool, tsize + SL_NODE_SIZE(level)); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } pNode->level = level; + tEncoderInit(&ec, (uint8_t *)SL_NODE_DATA(pNode), tsize); + ret = tsdbEncodeRow(&ec, &tRow); + ASSERT(ret == 0); + tEncoderClear(&ec); - // uint8_t *pData = SL_NODE_DATA(pSlNode); - // *(int64_t *)pData = version; - // pData += sizeof(version); - // memcpy(pData, pt, p - pt); - - // // insert row - // tsdbMemSkipListCursorPut(&slc, pSlNode); + // put the node + tsdbMemSkipListCursorPut(pMemTb->pSlc, pNode); // update status - if (pRow->ts < pMemData->minKey) pMemData->minKey = pRow->ts; - if (pRow->ts > pMemData->maxKey) pMemData->maxKey = pRow->ts; + if (tRow.pRow->ts < pMemData->minKey) pMemData->minKey = tRow.pRow->ts; + if (tRow.pRow->ts > pMemData->maxKey) pMemData->maxKey = tRow.pRow->ts; } - tDecoderClear(&decoder); - // tsdbMemSkipListCursorClose(&slc); + tDecoderClear(&dc); // update status if (pMemData->minVer == -1) pMemData->minVer = version; @@ -236,7 +250,7 @@ int32_t tsdbInsertData2(SMemTable *pMemTb, int64_t version, const SVSubmitBlk *p return 0; } -static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { +static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { int8_t level = 1; int8_t tlevel; const uint32_t factor = 4; @@ -249,4 +263,51 @@ static int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { } return level; +} + +static FORCE_INLINE int32_t tsdbEncodeRow(SEncoder *pEncoder, const STsdbRow *pRow) { + if (tEncodeI64(pEncoder, pRow->version) < 0) return -1; + if (tEncodeBinary(pEncoder, (const uint8_t *)pRow->pRow, pRow->szRow) < 0) return -1; + return 0; +} + +static FORCE_INLINE int32_t tsdbDecodeRow(SDecoder *pDecoder, STsdbRow *pRow) { + if (tDecodeI64(pDecoder, &pRow->version) < 0) return -1; + if (tDecodeBinary(pDecoder, (const uint8_t **)&pRow->pRow, &pRow->szRow) < 0) return -1; + return 0; +} + +static int32_t tsdbMemSkipListCursorCreate(int8_t maxLevel, SMemSkipListCurosr **ppSlc) { + *ppSlc = (SMemSkipListCurosr *)taosMemoryCalloc(1, sizeof(**ppSlc) + sizeof(SMemSkipListNode *) * maxLevel); + if (*ppSlc == NULL) { + return -1; + } + return 0; +} + +static void tsdbMemSkipListCursorDestroy(SMemSkipListCurosr *pSlc) { taosMemoryFree(pSlc); } + +static void tsdbMemSkipListCursorInit(SMemSkipListCurosr *pSlc, SMemSkipList *pSl) { + SMemSkipListNode *pHead = SL_HEAD_NODE(pSl); + pSlc->pSl = pSl; + for (int8_t iLevel = 0; iLevel < pSl->maxLevel; iLevel++) { + pSlc->forwards[iLevel] = pHead; + } +} + +static void tsdbMemSkipListCursorPut(SMemSkipListCurosr *pSlc, SMemSkipListNode *pNode) { + SMemSkipList *pSl = pSlc->pSl; + SMemSkipListNode *pNodeNext; + + for (int8_t iLevel = 0; iLevel < pNode->level; iLevel++) { + // todo + + ASSERT(0); + } + + if (pSl->level < pNode->level) { + pSl->level = pNode->level; + } + + pSl->size += 1; } \ No newline at end of file